twalthr commented on code in PR #26113:
URL: https://github.com/apache/flink/pull/26113#discussion_r1954633723


##########
docs/content/docs/dev/table/sql/queries/joins.md:
##########
@@ -326,16 +326,55 @@ FROM Orders AS o
 
 In the example above, the Orders table is enriched with data from the 
Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` 
clause with the subsequent processing time attribute ensures that each row of 
the `Orders` table is joined with those Customers rows that match the join 
predicate at the point in time when the `Orders` row is processed by the join 
operator. It also prevents that the join result is updated when a joined 
`Customer` row is updated in the future. The lookup join also requires a 
mandatory equality join predicate, in the example above `o.customer_id = c.id`.
 
-Array Expansion
+Array, Multiset and Map Expansion
 --------------
 
-Returns a new row for each element in the given array. Unnesting `WITH 
ORDINALITY` is not yet supported.
+Unnest returns a new row for each element in the given array, multiset or map. 
Supports both `CROSS JOIN` and `LEFT JOIN`.
+```sql
+-- Returns a new row for each element in a constant array
+SELECT * FROM (VALUES('order_1')), UNNEST(ARRAY["shirt", "pants", "hat"])
+
+id       product_name
+=======  ============
+order_1  shirt
+order_1  pants
+order_1  hat
+
+-- Returns a new row for each element in the array
+-- assuming a Orders table with an array column `product_names`
+SELECT order_id, product_name
+FROM Orders 
+    CROSS JOIN UNNEST(product_names) AS t(product_name)
+```
+
+Unnesting `WITH ORDINALITY` is also supported.
+
 
 ```sql
-SELECT order_id, tag
-FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
+-- Returns a new row for each element in a constant array and its position in 
the array
+SELECT * 
+FROM (VALUES('order_1'))

Review Comment:
   ```suggestion
   FROM (VALUES ('order_1'), ('order_2'))
   ```
   it might be easier to understand with a space after VALUES and a second value



##########
docs/content/docs/dev/table/sql/queries/joins.md:
##########
@@ -326,16 +326,55 @@ FROM Orders AS o
 
 In the example above, the Orders table is enriched with data from the 
Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` 
clause with the subsequent processing time attribute ensures that each row of 
the `Orders` table is joined with those Customers rows that match the join 
predicate at the point in time when the `Orders` row is processed by the join 
operator. It also prevents that the join result is updated when a joined 
`Customer` row is updated in the future. The lookup join also requires a 
mandatory equality join predicate, in the example above `o.customer_id = c.id`.
 
-Array Expansion
+Array, Multiset and Map Expansion
 --------------
 
-Returns a new row for each element in the given array. Unnesting `WITH 
ORDINALITY` is not yet supported.
+Unnest returns a new row for each element in the given array, multiset or map. 
Supports both `CROSS JOIN` and `LEFT JOIN`.
+```sql
+-- Returns a new row for each element in a constant array
+SELECT * FROM (VALUES('order_1')), UNNEST(ARRAY["shirt", "pants", "hat"])

Review Comment:
   in this tested? `"` should be single quotes
   
   ```suggestion
   SELECT * FROM (VALUES('order_1')), UNNEST(ARRAY['shirt', 'pants', 'hat'])
   ```



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala:
##########
@@ -315,4 +315,279 @@ class UnnestITCase extends BatchTestBase {
       Seq(row('a', 1), row('a', 2), row('a', 3)))
   }
 
+  @Test
+  def testUnnestWithOrdinalityWithValuesStream(): Unit = {
+    checkResult(
+      "SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]) WITH 
ORDINALITY",
+      Seq(row('a', 1, 1), row('a', 2, 2), row('a', 3, 3))
+    )
+  }
+
+  @Test
+  def testUnnestArrayWithOrdinality(): Unit = {
+    val data = List(
+      row(1, Array(12, 45)),
+      row(2, Array(41, 5)),
+      row(3, Array(18, 42))
+    )
+    registerCollection(
+      "T",
+      data,
+      new RowTypeInfo(Types.INT, Types.PRIMITIVE_ARRAY(Types.INT)),

Review Comment:
   is there a version of this method that takes data type? RowTypeInfo is 
legacy.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala:
##########
@@ -379,4 +380,486 @@ class UnnestITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mo
     val expected = List("a,1", "a,2", "a,3")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityWithValuesStream(): Unit = {
+    val sqlQuery = "SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 
3]) WITH ORDINALITY"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("a,1,1", "a,2,2", "a,3,3")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array(12, 45)),
+      (2, Array(41, 5)),
+      (3, Array(18, 42))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT a, number, ordinality 
+                     |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, 
ordinality)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,12,1", "1,45,2", "2,41,1", "2,5,2", "3,18,1", 
"3,42,2")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestFromTableWithOrdinality(): Unit = {
+    val data = List(
+      (1, 1L, Array("Hi", "w")),
+      (2, 2L, Array("Hello", "k")),
+      (3, 2L, Array("Hello world", "x"))
+    )
+
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 
'c)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = "SELECT a, s, o FROM T, UNNEST(T.c) WITH ORDINALITY as A 
(s, o)"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,Hi,1", "1,w,2", "2,Hello,1", "2,k,2", "3,Hello 
world,1", "3,x,2")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfArrayWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array(Array(1, 2), Array(3))),
+      (2, Array(Array(4, 5), Array(6, 7))),
+      (3, Array(Array(8)))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'nested_array)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, array_val, array_pos, `element`, element_pos
+        |FROM T
+        |CROSS JOIN UNNEST(nested_array) WITH ORDINALITY AS A(array_val, 
array_pos)
+        |CROSS JOIN UNNEST(array_val) WITH ORDINALITY AS B(`element`, 
element_pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,[1, 2],1,1,1",
+      "1,[1, 2],1,2,2",
+      "1,[3],2,3,1",
+      "2,[4, 5],1,4,1",
+      "2,[4, 5],1,5,2",
+      "2,[6, 7],2,6,1",
+      "2,[6, 7],2,7,2",
+      "3,[8],1,8,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMultisetWithOrdinality(): Unit = {
+    val data = List(
+      (1, 1, "Hi"),
+      (1, 2, "Hello"),
+      (2, 2, "World"),
+      (3, 3, "Hello world")
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 
'c)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |WITH T1 AS (SELECT a, COLLECT(c) as words FROM T GROUP BY a)
+        |SELECT a, word, pos
+        |FROM T1 CROSS JOIN UNNEST(words) WITH ORDINALITY AS A(word, pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    val sink = new TestingRetractSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,Hi,1",
+      "1,Hello,2",
+      "2,World,1",
+      "3,Hello world,1"
+    )
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMapWithOrdinality(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("a", "10")
+          map.put("b", "11")
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("c", "20")
+          map.put("d", "21")
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "map_data"),
+      Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, 
Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT id, k, v, pos
+                     |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS 
f(k, v, pos)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutordinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List("1,a,10", "1,b,11", "2,c,20", "2,d,21")
+
+    assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  def testUnnestForMapOfRowsWitOrdinality(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("a", "a"), Row.of(10: Integer))
+          map.put(Row.of("b", "b"), Row.of(11: Integer))
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("c", "c"), Row.of(20: Integer))
+          map
+        }),
+      Row.of(
+        Int.box(3), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("d", "d"), Row.of(30: Integer))
+          map.put(Row.of("e", "e"), Row.of(31: Integer))
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("a", "b"),
+      Array[TypeInformation[_]](
+        Types.INT,
+        Types.MAP(Types.ROW(Types.STRING, Types.STRING), 
Types.ROW(Types.INT())))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = "SELECT a, k, v, o FROM T CROSS JOIN UNNEST(b) WITH 
ORDINALITY as f (k, v, o)"
+    val result = tEnv.sqlQuery(sqlQuery)
+    TestSinkUtil.addValuesSink(
+      tEnv,
+      "MySink",
+      List("a", "k", "v", "o"),
+      List(
+        DataTypes.INT,
+        DataTypes.ROW(DataTypes.STRING(), DataTypes.STRING()),
+        DataTypes.ROW(DataTypes.INT()),
+        DataTypes.INT.notNull()),
+      ChangelogMode.all()
+    )
+    result.executeInsert("MySink").await()
+
+    val expected =
+      List("1,a,a,10", "1,b,b,11", "2,c,c,20", "3,d,d,30", "3,e,e,31")
+    val resultWithoutOrd = assertAndRemoveOrdinality(
+      TestValuesTableFactory.getResultsAsStrings("MySink").sorted.toList,
+      2)
+    assertThat(resultWithoutOrd).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForChainOfArraysAndMaps(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1),
+        Array("a", "b"), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("x", "10")
+          map.put("y", "20")
+          map
+        }),
+      Row.of(
+        Int.box(2),
+        Array("c", "d"), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("z", "30")
+          map.put("w", "40")
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "array_data", "map_data"),
+      Array[TypeInformation[_]](
+        Types.INT,
+        Types.OBJECT_ARRAY(Types.STRING),
+        Types.MAP(Types.STRING, Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'array_data, 'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, array_val, array_pos, map_key, map_val, map_pos
+        |FROM T
+        |CROSS JOIN UNNEST(array_data) WITH ORDINALITY AS A(array_val, 
array_pos)
+        |CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS B(map_key, map_val, 
map_pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutOrdinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List(
+      "1,a,1,x,10",
+      "1,a,1,y,20",
+      "1,b,2,x,10",
+      "1,b,2,y,20",
+      "2,c,1,z,30",
+      "2,c,1,w,40",
+      "2,d,2,z,30",
+      "2,d,2,w,40"
+    )
+
+    assertThat(resultsWithoutOrdinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForEmptyArray(): Unit = {
+    val data = List(
+      (1, Array[Int]())
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT a, number, ordinality
+                     |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, 
ordinality)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List()
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForMapWithNullValues(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("a", "10")
+          map.put("b", null)
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("c", "20")
+          map.put("d", null)
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "map_data"),
+      Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, 
Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, k, v, pos
+        |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS f(k, v, pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutordinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List("1,a,10", "1,b,null", "2,c,20", "2,d,null")
+    assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfRowsFromTableWithOrdinality(): Unit = {
+    val data = List(
+      (2, Array((20, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      "SELECT a, b, s, t, o FROM T, UNNEST(T.b) WITH ORDINALITY AS A(s, t, o)"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "2,[20,41.6, 14,45.2136],20,41.6,1",
+      "2,[20,41.6, 14,45.2136],14,45.2136,2",
+      "3,[18,42.6],18,42.6,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfRowsWithNestedFilterWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array((12, "45.6"), (12, "45.612"))),
+      (2, Array((13, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("MyTable", t)
+
+    val sqlQuery =
+      """
+        |SELECT * FROM (
+        |   SELECT a, b1, b2, ord FROM
+        |       (SELECT a, b FROM MyTable) T
+        |       CROSS JOIN
+        |       UNNEST(T.b) WITH ORDINALITY as S(b1, b2, ord)
+        |       WHERE S.b1 >= 12
+        |   ) tmp
+        |WHERE b2 <> '42.6' AND ord <> 2
+    """.stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,12,45.6,1", "2,13,41.6,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMultiSetOfRowsFromCollectResultWithOrdinality(): Unit = {
+    val data = List(
+      (1, (12, "45.6")),
+      (2, (12, "45.612")),
+      (2, (13, "41.6")),
+      (3, (14, "45.2136")),
+      (3, (18, "42.6")))
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |WITH T1 AS (SELECT a, COLLECT(b) as `set` FROM T GROUP BY a)
+        |SELECT a, id, point, o FROM T1
+        |CROSS JOIN UNNEST(T1.`set`) WITH ORDINALITY AS A(id, point, o) WHERE 
a < 3
+      """.stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    val sink = new TestingRetractSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,12,45.6,1", "2,12,45.612,1", "2,13,41.6,2")
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)

Review Comment:
   introduce a parameterized method for this repeated logic



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala:
##########
@@ -315,4 +315,279 @@ class UnnestITCase extends BatchTestBase {
       Seq(row('a', 1), row('a', 2), row('a', 3)))
   }
 
+  @Test
+  def testUnnestWithOrdinalityWithValuesStream(): Unit = {
+    checkResult(
+      "SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]) WITH 
ORDINALITY",
+      Seq(row('a', 1, 1), row('a', 2, 2), row('a', 3, 3))
+    )
+  }
+
+  @Test
+  def testUnnestArrayWithOrdinality(): Unit = {
+    val data = List(
+      row(1, Array(12, 45)),
+      row(2, Array(41, 5)),
+      row(3, Array(18, 42))
+    )
+    registerCollection(
+      "T",
+      data,
+      new RowTypeInfo(Types.INT, Types.PRIMITIVE_ARRAY(Types.INT)),
+      "a, b")
+
+    checkResult(
+      """
+        |SELECT a, number, ordinality 
+        |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, ordinality)
+        |""".stripMargin,
+      Seq(row(1, 12, 1), row(1, 45, 2), row(2, 41, 1), row(2, 5, 2), row(3, 
18, 1), row(3, 42, 2))
+    )
+  }
+
+  @Test
+  def testUnnestFromTableWithOrdinality(): Unit = {
+    val data = List(
+      row(1, 1L, Array("Hi", "w")),
+      row(2, 2L, Array("Hello", "k")),
+      row(3, 2L, Array("Hello world", "x"))
+    )
+    registerCollection(
+      "T",
+      data,
+      new RowTypeInfo(Types.INT, Types.LONG, Types.OBJECT_ARRAY(Types.STRING)),
+      "a, b, c")
+
+    checkResult(
+      "SELECT a, s, o FROM T, UNNEST(T.c) WITH ORDINALITY as A (s, o)",
+      Seq(
+        row(1, "Hi", 1),
+        row(1, "w", 2),
+        row(2, "Hello", 1),
+        row(2, "k", 2),
+        row(3, "Hello world", 1),
+        row(3, "x", 2))
+    )
+  }
+
+  @Test
+  def testUnnestArrayOfArrayWithOrdinality(): Unit = {
+    val data = List(
+      row(1, Array(Array(1, 2), Array(3))),
+      row(2, Array(Array(4, 5), Array(6, 7))),
+      row(3, Array(Array(8)))
+    )
+    registerCollection(
+      "T",
+      data,
+      new RowTypeInfo(Types.INT, 
Types.OBJECT_ARRAY(Types.PRIMITIVE_ARRAY(Types.INT))),
+      "id, nested_array")
+
+    checkResult(
+      """
+        |SELECT id, array_val, array_pos, `element`, element_pos
+        |FROM T
+        |CROSS JOIN UNNEST(nested_array) WITH ORDINALITY AS A(array_val, 
array_pos)
+        |CROSS JOIN UNNEST(array_val) WITH ORDINALITY AS B(`element`, 
element_pos)
+        |""".stripMargin,
+      Seq(
+        row(1, Array(1, 2), 1, 1, 1),
+        row(1, Array(1, 2), 1, 2, 2),
+        row(1, Array(3), 2, 3, 1),
+        row(2, Array(4, 5), 1, 4, 1),
+        row(2, Array(4, 5), 1, 5, 2),
+        row(2, Array(6, 7), 2, 6, 1),
+        row(2, Array(6, 7), 2, 7, 2),
+        row(3, Array(8), 1, 8, 1)
+      )
+    )
+  }
+
+  @Test
+  def testUnnestMultisetWithOrdinality(): Unit = {
+    val data = List(
+      row(1, 1, "Hi"),
+      row(1, 2, "Hello"),
+      row(2, 2, "World"),
+      row(3, 3, "Hello world")
+    )
+    registerCollection("T", data, new RowTypeInfo(Types.INT, Types.INT, 
Types.STRING), "a, b, c")
+
+    checkResult(
+      """
+        |WITH T1 AS (SELECT a, COLLECT(c) as words FROM T GROUP BY a)
+        |SELECT a, word, pos
+        |FROM T1 CROSS JOIN UNNEST(words) WITH ORDINALITY AS A(word, pos)
+        |""".stripMargin,
+      Seq(row(1, "Hi", 1), row(1, "Hello", 2), row(2, "World", 1), row(3, 
"Hello world", 1))
+    )
+  }
+
+  @Test
+  def testUnnestMapWithOrdinality(): Unit = {
+    val data = List(
+      row(1, Map("a" -> "10", "b" -> "11").asJava),
+      row(2, Map("c" -> "20", "d" -> "21").asJava)
+    )
+    registerCollection(
+      "T",
+      data,
+      new RowTypeInfo(Types.INT, Types.MAP(Types.STRING, Types.STRING)),
+      "id, map_data")
+
+    checkResult(
+      """
+        |SELECT id, k, v
+        |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS f(k, v, pos)
+        |""".stripMargin,
+      Seq(row(1, "a", "10"), row(1, "b", "11"), row(2, "c", "20"), row(2, "d", 
"21"))
+    )
+  }
+
+  @Test
+  def testUnnestForMapOfRowsWithOrdinality(): Unit = {
+    val data = List(
+      row(
+        1, {
+          val map = new java.util.HashMap[Row, Row]()

Review Comment:
   use `Map(...).asJava` everywhere



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala:
##########
@@ -379,4 +380,486 @@ class UnnestITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mo
     val expected = List("a,1", "a,2", "a,3")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityWithValuesStream(): Unit = {
+    val sqlQuery = "SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 
3]) WITH ORDINALITY"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("a,1,1", "a,2,2", "a,3,3")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array(12, 45)),
+      (2, Array(41, 5)),
+      (3, Array(18, 42))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT a, number, ordinality 
+                     |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, 
ordinality)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,12,1", "1,45,2", "2,41,1", "2,5,2", "3,18,1", 
"3,42,2")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestFromTableWithOrdinality(): Unit = {
+    val data = List(
+      (1, 1L, Array("Hi", "w")),
+      (2, 2L, Array("Hello", "k")),
+      (3, 2L, Array("Hello world", "x"))
+    )
+
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 
'c)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = "SELECT a, s, o FROM T, UNNEST(T.c) WITH ORDINALITY as A 
(s, o)"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,Hi,1", "1,w,2", "2,Hello,1", "2,k,2", "3,Hello 
world,1", "3,x,2")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfArrayWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array(Array(1, 2), Array(3))),
+      (2, Array(Array(4, 5), Array(6, 7))),
+      (3, Array(Array(8)))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'nested_array)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, array_val, array_pos, `element`, element_pos
+        |FROM T
+        |CROSS JOIN UNNEST(nested_array) WITH ORDINALITY AS A(array_val, 
array_pos)
+        |CROSS JOIN UNNEST(array_val) WITH ORDINALITY AS B(`element`, 
element_pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,[1, 2],1,1,1",
+      "1,[1, 2],1,2,2",
+      "1,[3],2,3,1",
+      "2,[4, 5],1,4,1",
+      "2,[4, 5],1,5,2",
+      "2,[6, 7],2,6,1",
+      "2,[6, 7],2,7,2",
+      "3,[8],1,8,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMultisetWithOrdinality(): Unit = {
+    val data = List(
+      (1, 1, "Hi"),
+      (1, 2, "Hello"),
+      (2, 2, "World"),
+      (3, 3, "Hello world")
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b, 
'c)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |WITH T1 AS (SELECT a, COLLECT(c) as words FROM T GROUP BY a)
+        |SELECT a, word, pos
+        |FROM T1 CROSS JOIN UNNEST(words) WITH ORDINALITY AS A(word, pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    val sink = new TestingRetractSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,Hi,1",
+      "1,Hello,2",
+      "2,World,1",
+      "3,Hello world,1"
+    )
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMapWithOrdinality(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("a", "10")
+          map.put("b", "11")
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("c", "20")
+          map.put("d", "21")
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "map_data"),
+      Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, 
Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT id, k, v, pos
+                     |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS 
f(k, v, pos)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutordinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List("1,a,10", "1,b,11", "2,c,20", "2,d,21")
+
+    assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  def testUnnestForMapOfRowsWitOrdinality(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("a", "a"), Row.of(10: Integer))
+          map.put(Row.of("b", "b"), Row.of(11: Integer))
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("c", "c"), Row.of(20: Integer))
+          map
+        }),
+      Row.of(
+        Int.box(3), {
+          val map = new java.util.HashMap[Row, Row]()
+          map.put(Row.of("d", "d"), Row.of(30: Integer))
+          map.put(Row.of("e", "e"), Row.of(31: Integer))
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("a", "b"),
+      Array[TypeInformation[_]](
+        Types.INT,
+        Types.MAP(Types.ROW(Types.STRING, Types.STRING), 
Types.ROW(Types.INT())))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = "SELECT a, k, v, o FROM T CROSS JOIN UNNEST(b) WITH 
ORDINALITY as f (k, v, o)"
+    val result = tEnv.sqlQuery(sqlQuery)
+    TestSinkUtil.addValuesSink(
+      tEnv,
+      "MySink",
+      List("a", "k", "v", "o"),
+      List(
+        DataTypes.INT,
+        DataTypes.ROW(DataTypes.STRING(), DataTypes.STRING()),
+        DataTypes.ROW(DataTypes.INT()),
+        DataTypes.INT.notNull()),
+      ChangelogMode.all()
+    )
+    result.executeInsert("MySink").await()
+
+    val expected =
+      List("1,a,a,10", "1,b,b,11", "2,c,c,20", "3,d,d,30", "3,e,e,31")
+    val resultWithoutOrd = assertAndRemoveOrdinality(
+      TestValuesTableFactory.getResultsAsStrings("MySink").sorted.toList,
+      2)
+    assertThat(resultWithoutOrd).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForChainOfArraysAndMaps(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1),
+        Array("a", "b"), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("x", "10")
+          map.put("y", "20")
+          map
+        }),
+      Row.of(
+        Int.box(2),
+        Array("c", "d"), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("z", "30")
+          map.put("w", "40")
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "array_data", "map_data"),
+      Array[TypeInformation[_]](
+        Types.INT,
+        Types.OBJECT_ARRAY(Types.STRING),
+        Types.MAP(Types.STRING, Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'array_data, 'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, array_val, array_pos, map_key, map_val, map_pos
+        |FROM T
+        |CROSS JOIN UNNEST(array_data) WITH ORDINALITY AS A(array_val, 
array_pos)
+        |CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS B(map_key, map_val, 
map_pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutOrdinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List(
+      "1,a,1,x,10",
+      "1,a,1,y,20",
+      "1,b,2,x,10",
+      "1,b,2,y,20",
+      "2,c,1,z,30",
+      "2,c,1,w,40",
+      "2,d,2,z,30",
+      "2,d,2,w,40"
+    )
+
+    assertThat(resultsWithoutOrdinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForEmptyArray(): Unit = {
+    val data = List(
+      (1, Array[Int]())
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery = """
+                     |SELECT a, number, ordinality
+                     |FROM T CROSS JOIN UNNEST(b) WITH ORDINALITY AS t(number, 
ordinality)
+                     |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List()
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected)
+  }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityForMapWithNullValues(): Unit = {
+    val data = List(
+      Row.of(
+        Int.box(1), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("a", "10")
+          map.put("b", null)
+          map
+        }),
+      Row.of(
+        Int.box(2), {
+          val map = new java.util.HashMap[String, String]()
+          map.put("c", "20")
+          map.put("d", null)
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      Array("id", "map_data"),
+      Array[TypeInformation[_]](Types.INT, Types.MAP(Types.STRING, 
Types.STRING))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'id, 
'map_data)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      """
+        |SELECT id, k, v, pos
+        |FROM T CROSS JOIN UNNEST(map_data) WITH ORDINALITY AS f(k, v, pos)
+        |""".stripMargin
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val resultsWithoutordinality = 
assertAndRemoveOrdinality(sink.getAppendResults, 2)
+    val expected = List("1,a,10", "1,b,null", "2,c,20", "2,d,null")
+    assertThat(resultsWithoutordinality.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfRowsFromTableWithOrdinality(): Unit = {
+    val data = List(
+      (2, Array((20, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)
+
+    val sqlQuery =
+      "SELECT a, b, s, t, o FROM T, UNNEST(T.b) WITH ORDINALITY AS A(s, t, o)"
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "2,[20,41.6, 14,45.2136],20,41.6,1",
+      "2,[20,41.6, 14,45.2136],14,45.2136,2",
+      "3,[18,42.6],18,42.6,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestArrayOfRowsWithNestedFilterWithOrdinality(): Unit = {
+    val data = List(
+      (1, Array((12, "45.6"), (12, "45.612"))),
+      (2, Array((13, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("MyTable", t)
+
+    val sqlQuery =
+      """
+        |SELECT * FROM (
+        |   SELECT a, b1, b2, ord FROM
+        |       (SELECT a, b FROM MyTable) T
+        |       CROSS JOIN
+        |       UNNEST(T.b) WITH ORDINALITY as S(b1, b2, ord)
+        |       WHERE S.b1 >= 12
+        |   ) tmp
+        |WHERE b2 <> '42.6' AND ord <> 2
+    """.stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toDataStream
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,12,45.6,1", "2,13,41.6,1")
+    assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
+  }
+
+  @TestTemplate
+  def testUnnestMultiSetOfRowsFromCollectResultWithOrdinality(): Unit = {
+    val data = List(
+      (1, (12, "45.6")),
+      (2, (12, "45.612")),
+      (2, (13, "41.6")),
+      (3, (14, "45.2136")),
+      (3, (18, "42.6")))
+    val t = StreamingEnvUtil.fromCollection(env, data).toTable(tEnv, 'a, 'b)
+    tEnv.createTemporaryView("T", t)

Review Comment:
   introduce a method for these repeated calls



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala:
##########
@@ -379,4 +380,486 @@ class UnnestITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mo
     val expected = List("a,1", "a,2", "a,3")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
+
+  @TestTemplate
+  def testUnnestWithOrdinalityWithValuesStream(): Unit = {

Review Comment:
   For the next time: We should not use all these old legacy classes. There is 
a new SemanticTest class. Take a look at e.g. 
`ProcessTableFunctionSemanticTests`. Everything you added can be represented 
without TypeInfo legacy or even .toDataStream/StreamingEnvUtil.fromCollection 
etc.



##########
docs/content/docs/dev/table/sql/queries/joins.md:
##########
@@ -326,16 +326,55 @@ FROM Orders AS o
 
 In the example above, the Orders table is enriched with data from the 
Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` 
clause with the subsequent processing time attribute ensures that each row of 
the `Orders` table is joined with those Customers rows that match the join 
predicate at the point in time when the `Orders` row is processed by the join 
operator. It also prevents that the join result is updated when a joined 
`Customer` row is updated in the future. The lookup join also requires a 
mandatory equality join predicate, in the example above `o.customer_id = c.id`.
 
-Array Expansion
+Array, Multiset and Map Expansion
 --------------
 
-Returns a new row for each element in the given array. Unnesting `WITH 
ORDINALITY` is not yet supported.
+Unnest returns a new row for each element in the given array, multiset or map. 
Supports both `CROSS JOIN` and `LEFT JOIN`.
+```sql
+-- Returns a new row for each element in a constant array
+SELECT * FROM (VALUES('order_1')), UNNEST(ARRAY["shirt", "pants", "hat"])
+
+id       product_name
+=======  ============
+order_1  shirt
+order_1  pants
+order_1  hat
+
+-- Returns a new row for each element in the array
+-- assuming a Orders table with an array column `product_names`
+SELECT order_id, product_name
+FROM Orders 
+    CROSS JOIN UNNEST(product_names) AS t(product_name)
+```
+
+Unnesting `WITH ORDINALITY` is also supported.
+
 
 ```sql
-SELECT order_id, tag
-FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
+-- Returns a new row for each element in a constant array and its position in 
the array
+SELECT * 
+FROM (VALUES('order_1'))

Review Comment:
   because actually the parenthesis are unnecessary if a row contains only one 
field



##########
docs/content/docs/dev/table/sql/queries/joins.md:
##########
@@ -326,16 +326,55 @@ FROM Orders AS o
 
 In the example above, the Orders table is enriched with data from the 
Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` 
clause with the subsequent processing time attribute ensures that each row of 
the `Orders` table is joined with those Customers rows that match the join 
predicate at the point in time when the `Orders` row is processed by the join 
operator. It also prevents that the join result is updated when a joined 
`Customer` row is updated in the future. The lookup join also requires a 
mandatory equality join predicate, in the example above `o.customer_id = c.id`.
 
-Array Expansion
+Array, Multiset and Map Expansion
 --------------
 
-Returns a new row for each element in the given array. Unnesting `WITH 
ORDINALITY` is not yet supported.
+Unnest returns a new row for each element in the given array, multiset or map. 
Supports both `CROSS JOIN` and `LEFT JOIN`.
+```sql
+-- Returns a new row for each element in a constant array
+SELECT * FROM (VALUES('order_1')), UNNEST(ARRAY["shirt", "pants", "hat"])
+
+id       product_name
+=======  ============
+order_1  shirt
+order_1  pants
+order_1  hat
+
+-- Returns a new row for each element in the array
+-- assuming a Orders table with an array column `product_names`
+SELECT order_id, product_name
+FROM Orders 
+    CROSS JOIN UNNEST(product_names) AS t(product_name)
+```
+
+Unnesting `WITH ORDINALITY` is also supported.
+
 
 ```sql
-SELECT order_id, tag
-FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
+-- Returns a new row for each element in a constant array and its position in 
the array
+SELECT * 
+FROM (VALUES('order_1'))
+    CROSS JOIN UNNEST(ARRAY["shirt", "pants", "hat"]) 
+        WITH ORDINALITY AS t(product_name, index)
+
+id       product_name  index
+=======  ============  =====
+order_1  shirt             1
+order_1  pants             2
+order_1  hat               3
+
+-- Returns a new row for each element and its position in the array
+-- assuming a Orders table with an array column `product_names`
+SELECT order_id, product_name, product_index
+FROM Orders 
+    CROSS JOIN UNNEST(product_names) 
+        WITH ORDINALITY AS t(product_name, product_index)
 ```
 
+A unnest with ordinality will return each element and the position of the 
element in the data structure, 1-indexed. 
+The order of the elements for arrays is guaranteed. Since maps and multisets 
are unordered, the order of the elements is not guaranteed.
+Currently, WITH ORDINALITY only supports cross joins but not left joins.

Review Comment:
   ```suggestion
   Currently, `WITH ORDINALITY` only supports `CROSS JOIN` but not `LEFT JOIN`.
   ```
   It helps to upper case concepts.



##########
docs/content/docs/dev/table/sql/queries/joins.md:
##########
@@ -326,16 +326,55 @@ FROM Orders AS o
 
 In the example above, the Orders table is enriched with data from the 
Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` 
clause with the subsequent processing time attribute ensures that each row of 
the `Orders` table is joined with those Customers rows that match the join 
predicate at the point in time when the `Orders` row is processed by the join 
operator. It also prevents that the join result is updated when a joined 
`Customer` row is updated in the future. The lookup join also requires a 
mandatory equality join predicate, in the example above `o.customer_id = c.id`.
 
-Array Expansion
+Array, Multiset and Map Expansion
 --------------
 
-Returns a new row for each element in the given array. Unnesting `WITH 
ORDINALITY` is not yet supported.
+Unnest returns a new row for each element in the given array, multiset or map. 
Supports both `CROSS JOIN` and `LEFT JOIN`.
+```sql
+-- Returns a new row for each element in a constant array
+SELECT * FROM (VALUES('order_1')), UNNEST(ARRAY["shirt", "pants", "hat"])
+
+id       product_name
+=======  ============
+order_1  shirt
+order_1  pants
+order_1  hat
+
+-- Returns a new row for each element in the array
+-- assuming a Orders table with an array column `product_names`
+SELECT order_id, product_name
+FROM Orders 
+    CROSS JOIN UNNEST(product_names) AS t(product_name)
+```
+
+Unnesting `WITH ORDINALITY` is also supported.
+
 
 ```sql
-SELECT order_id, tag
-FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
+-- Returns a new row for each element in a constant array and its position in 
the array
+SELECT * 
+FROM (VALUES('order_1'))
+    CROSS JOIN UNNEST(ARRAY["shirt", "pants", "hat"]) 
+        WITH ORDINALITY AS t(product_name, index)
+
+id       product_name  index
+=======  ============  =====
+order_1  shirt             1
+order_1  pants             2
+order_1  hat               3
+
+-- Returns a new row for each element and its position in the array
+-- assuming a Orders table with an array column `product_names`
+SELECT order_id, product_name, product_index
+FROM Orders 
+    CROSS JOIN UNNEST(product_names) 
+        WITH ORDINALITY AS t(product_name, product_index)
 ```
 
+A unnest with ordinality will return each element and the position of the 
element in the data structure, 1-indexed. 

Review Comment:
   ```suggestion
   An unnest with ordinality will return each element and the position of the 
element in the data structure, 1-indexed. 
   ```



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/UnnestRowsFunctionBase.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.runtime.functions.BuiltInSpecializedFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+
+/** Base class for flattening ARRAY, MAP, and MULTISET using a table function. 
*/
+@Internal
+public abstract class UnnestRowsFunctionBase extends 
BuiltInSpecializedFunction {
+
+    public UnnestRowsFunctionBase() {
+        super(BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS);
+    }
+
+    @Override
+    public UserDefinedFunction specialize(SpecializedContext context) {
+        final LogicalType argType =
+                
context.getCallContext().getArgumentDataTypes().get(0).getLogicalType();
+        switch (argType.getTypeRoot()) {
+            case ARRAY:
+                final ArrayType arrayType = (ArrayType) argType;
+                return createCollectionUnnestFunction(
+                        context,
+                        arrayType.getElementType(),
+                        
ArrayData.createElementGetter(arrayType.getElementType()));
+            case MULTISET:
+                final MultisetType multisetType = (MultisetType) argType;
+                return createCollectionUnnestFunction(
+                        context,
+                        multisetType.getElementType(),
+                        
ArrayData.createElementGetter(multisetType.getElementType()));
+            case MAP:
+                final MapType mapType = (MapType) argType;
+                return createMapUnnestFunction(
+                        context,
+                        RowType.of(false, mapType.getKeyType(), 
mapType.getValueType()),
+                        ArrayData.createElementGetter(mapType.getKeyType()),
+                        ArrayData.createElementGetter(mapType.getValueType()));
+            default:
+                throw new UnsupportedOperationException("Unsupported type for 
UNNEST: " + argType);
+        }
+    }
+
+    protected abstract UserDefinedFunction createCollectionUnnestFunction(
+            SpecializedContext context,
+            LogicalType elementType,
+            ArrayData.ElementGetter elementGetter);
+
+    protected abstract UserDefinedFunction createMapUnnestFunction(
+            SpecializedContext context,
+            RowType keyValTypes,
+            ArrayData.ElementGetter keyGetter,
+            ArrayData.ElementGetter valueGetter);
+
+    public static LogicalType getUnnestedType(LogicalType logicalType, boolean 
withOrdinality) {
+        LogicalType elementType;
+        switch (logicalType.getTypeRoot()) {
+            case ARRAY:
+                elementType = ((ArrayType) logicalType).getElementType();
+                break;
+            case MULTISET:
+                elementType = ((MultisetType) logicalType).getElementType();
+                break;
+            case MAP:
+                MapType mapType = (MapType) logicalType;
+                elementType = RowType.of(false, mapType.getKeyType(), 
mapType.getValueType());
+                break;
+            default:
+                throw new UnsupportedOperationException("Unsupported UNNEST 
type: " + logicalType);
+        }
+
+        if (withOrdinality) {
+            return wrapWithOrdinality(elementType);
+        }
+        return elementType;
+    }
+
+    public static LogicalType wrapWithOrdinality(LogicalType baseType) {
+        // If baseType is already a ROW, extract its fields and add an 
ordinality field
+        if (baseType instanceof RowType) {
+            RowType rowType = (RowType) baseType;
+            int fieldCount = rowType.getFieldCount();
+            LogicalType[] types = new LogicalType[fieldCount + 1];
+            String[] names = new String[types.length];
+
+            for (int i = 0; i < fieldCount; i++) {
+                types[i] = rowType.getTypeAt(i);
+                names[i] = "f" + i;

Review Comment:
   use `r =LogicalTypeUtils#toRowType`?
   then 
   `new RowType(Stream.concat(r.getFields().stream(), Stream.of(new 
RowField("ordinality", new IntType(false)))`.
   we don't want to loose field names and should improve code readability.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/UnnestRowsFunctionBase.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.runtime.functions.BuiltInSpecializedFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+
+/** Base class for flattening ARRAY, MAP, and MULTISET using a table function. 
*/
+@Internal
+public abstract class UnnestRowsFunctionBase extends 
BuiltInSpecializedFunction {
+
+    public UnnestRowsFunctionBase() {
+        super(BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS);

Review Comment:
   I was also confused by this, please pass it as a parameter.



##########
docs/content/docs/dev/table/sql/queries/joins.md:
##########
@@ -326,16 +326,55 @@ FROM Orders AS o
 
 In the example above, the Orders table is enriched with data from the 
Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` 
clause with the subsequent processing time attribute ensures that each row of 
the `Orders` table is joined with those Customers rows that match the join 
predicate at the point in time when the `Orders` row is processed by the join 
operator. It also prevents that the join result is updated when a joined 
`Customer` row is updated in the future. The lookup join also requires a 
mandatory equality join predicate, in the example above `o.customer_id = c.id`.
 
-Array Expansion
+Array, Multiset and Map Expansion
 --------------
 
-Returns a new row for each element in the given array. Unnesting `WITH 
ORDINALITY` is not yet supported.
+Unnest returns a new row for each element in the given array, multiset or map. 
Supports both `CROSS JOIN` and `LEFT JOIN`.
+```sql
+-- Returns a new row for each element in a constant array
+SELECT * FROM (VALUES('order_1')), UNNEST(ARRAY["shirt", "pants", "hat"])
+
+id       product_name
+=======  ============
+order_1  shirt
+order_1  pants
+order_1  hat
+
+-- Returns a new row for each element in the array
+-- assuming a Orders table with an array column `product_names`
+SELECT order_id, product_name
+FROM Orders 
+    CROSS JOIN UNNEST(product_names) AS t(product_name)
+```
+
+Unnesting `WITH ORDINALITY` is also supported.
+
 
 ```sql
-SELECT order_id, tag
-FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
+-- Returns a new row for each element in a constant array and its position in 
the array
+SELECT * 
+FROM (VALUES('order_1'))
+    CROSS JOIN UNNEST(ARRAY["shirt", "pants", "hat"]) 
+        WITH ORDINALITY AS t(product_name, index)
+
+id       product_name  index
+=======  ============  =====
+order_1  shirt             1
+order_1  pants             2
+order_1  hat               3
+
+-- Returns a new row for each element and its position in the array
+-- assuming a Orders table with an array column `product_names`
+SELECT order_id, product_name, product_index

Review Comment:
   Could we make an example for multisets? It's not clear to me how WITH 
ORDINALITY behaves with them. The tests seem to drop the index?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/UnnestRowsFunction.java:
##########
@@ -19,169 +19,89 @@
 package org.apache.flink.table.runtime.functions.table;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.UserDefinedFunction;
-import org.apache.flink.table.runtime.functions.BuiltInSpecializedFunction;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.MapType;
-import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
 
 /**
  * Flattens ARRAY, MAP, and MULTISET using a table function. It does this by 
another level of
  * specialization using a subclass of {@link UnnestTableFunctionBase}.
  */
 @Internal
-public class UnnestRowsFunction extends BuiltInSpecializedFunction {
+public class UnnestRowsFunction extends UnnestRowsFunctionBase {
 
     public UnnestRowsFunction() {
-        super(BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS);
+        super();
     }
 
     @Override
-    public UserDefinedFunction specialize(SpecializedContext context) {
-        final LogicalType argType =
-                
context.getCallContext().getArgumentDataTypes().get(0).getLogicalType();
-        switch (argType.getTypeRoot()) {
-            case ARRAY:
-                final ArrayType arrayType = (ArrayType) argType;
-                return new CollectionUnnestTableFunction(
-                        context,
-                        arrayType.getElementType(),
-                        
ArrayData.createElementGetter(arrayType.getElementType()));
-            case MULTISET:
-                final MultisetType multisetType = (MultisetType) argType;
-                return new CollectionUnnestTableFunction(
-                        context,
-                        multisetType.getElementType(),
-                        
ArrayData.createElementGetter(multisetType.getElementType()));
-            case MAP:
-                final MapType mapType = (MapType) argType;
-                return new MapUnnestTableFunction(
-                        context,
-                        RowType.of(false, mapType.getKeyType(), 
mapType.getValueType()),
-                        ArrayData.createElementGetter(mapType.getKeyType()),
-                        ArrayData.createElementGetter(mapType.getValueType()));
-            default:
-                throw new UnsupportedOperationException("Unsupported type for 
UNNEST: " + argType);
-        }
-    }
-
-    public static LogicalType getUnnestedType(LogicalType logicalType) {
-        switch (logicalType.getTypeRoot()) {
-            case ARRAY:
-                return ((ArrayType) logicalType).getElementType();
-            case MULTISET:
-                return ((MultisetType) logicalType).getElementType();
-            case MAP:
-                final MapType mapType = (MapType) logicalType;
-                return RowType.of(false, mapType.getKeyType(), 
mapType.getValueType());
-            default:
-                throw new UnsupportedOperationException("Unsupported UNNEST 
type: " + logicalType);
-        }
+    protected UserDefinedFunction createCollectionUnnestFunction(
+            SpecializedContext context,
+            LogicalType elementType,
+            ArrayData.ElementGetter elementGetter) {
+        return new CollectionUnnestFunction(context, elementType, 
elementGetter);
     }
 
-    // 
--------------------------------------------------------------------------------------------
-    // Runtime Implementation
-    // 
--------------------------------------------------------------------------------------------
-
-    private abstract static class UnnestTableFunctionBase extends 
BuiltInTableFunction<Object> {
-
-        private final transient DataType outputDataType;
-
-        UnnestTableFunctionBase(SpecializedContext context, LogicalType 
outputType) {
-            super(BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS, context);
-            // The output type in the context is already wrapped, however, the 
result of the
-            // function is not. Therefore, we need a custom output type.
-            outputDataType = DataTypes.of(outputType).toInternal();
-        }
-
-        @Override
-        public DataType getOutputDataType() {
-            return outputDataType;
-        }
+    @Override
+    protected UserDefinedFunction createMapUnnestFunction(
+            SpecializedContext context,
+            RowType keyValTypes,
+            ArrayData.ElementGetter keyGetter,
+            ArrayData.ElementGetter valueGetter) {
+        return new MapUnnestFunction(context, keyValTypes, keyGetter, 
valueGetter);
     }
 
     /** Table function that unwraps the elements of a collection (array or 
multiset). */
-    public static final class CollectionUnnestTableFunction extends 
UnnestTableFunctionBase {
+    public static final class CollectionUnnestFunction extends 
UnnestTableFunctionBase {
 
         private static final long serialVersionUID = 1L;
 
         private final ArrayData.ElementGetter elementGetter;
 
-        public CollectionUnnestTableFunction(
+        public CollectionUnnestFunction(
                 SpecializedContext context,
-                LogicalType outputType,
+                LogicalType elementType,
                 ArrayData.ElementGetter elementGetter) {
-            super(context, outputType);
+            super(context, elementType);
             this.elementGetter = elementGetter;
         }
 
         public void eval(ArrayData arrayData) {
-            if (arrayData == null) {
-                return;
-            }
-            final int size = arrayData.size();
-            for (int pos = 0; pos < size; pos++) {
-                collect(elementGetter.getElementOrNull(arrayData, pos));
-            }
+            evalArrayData(arrayData, elementGetter, (element, position) -> 
collect(element));
         }
 
         public void eval(MapData mapData) {
-            if (mapData == null) {
-                return;
-            }
-            final int size = mapData.size();
-            final ArrayData keys = mapData.keyArray();
-            final ArrayData values = mapData.valueArray();
-            for (int pos = 0; pos < size; pos++) {
-                final int multiplier = values.getInt(pos);
-                final Object key = elementGetter.getElementOrNull(keys, pos);
-                for (int i = 0; i < multiplier; i++) {
-                    collect(key);
-                }
-            }
+            evalMultisetData(mapData, elementGetter, (element, position) -> 
collect(element));
         }
     }
 
     /** Table function that unwraps the elements of a map. */
-    public static final class MapUnnestTableFunction extends 
UnnestTableFunctionBase {
+    public static final class MapUnnestFunction extends 
UnnestTableFunctionBase {
 
         private static final long serialVersionUID = 1L;
 
         private final ArrayData.ElementGetter keyGetter;
-
         private final ArrayData.ElementGetter valueGetter;
 
-        public MapUnnestTableFunction(
+        public MapUnnestFunction(
                 SpecializedContext context,
-                LogicalType outputType,
+                LogicalType keyValTypes,

Review Comment:
   rename this back? it still describes the output type, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to