This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c39024  [FLINK-12200][table-planner] Support UNNEST for MAP types
2c39024 is described below

commit 2c390243739dcbf4a4abe93765b5b2064f6e26c7
Author: Artsem Semianenka <[email protected]>
AuthorDate: Fri Apr 26 16:36:17 2019 +0300

    [FLINK-12200][table-planner] Support UNNEST for MAP types
    
    This closes #8179
---
 .../plan/rules/logical/LogicalUnnestRule.scala     | 14 +++++-
 .../table/plan/util/ExplodeFunctionUtil.scala      | 15 +++++-
 .../flink/table/runtime/batch/sql/JoinITCase.scala | 53 ++++++++++++++++++++++
 3 files changed, 80 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
index 09886e0..87dc579 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
@@ -29,10 +29,11 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.Uncollect
 import org.apache.calcite.rel.logical._
 import org.apache.calcite.sql.`type`.AbstractSqlType
+import org.apache.flink.api.scala.typeutils.Types
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-import org.apache.flink.table.plan.schema.{ArrayRelDataType, 
MultisetRelDataType}
+import org.apache.flink.table.plan.schema.{ArrayRelDataType, MapRelDataType, 
MultisetRelDataType}
 import org.apache.flink.table.plan.util.ExplodeFunctionUtil
 
 class LogicalUnnestRule(
@@ -94,6 +95,17 @@ class LogicalUnnestRule(
             case arrayType: ArrayRelDataType =>
               (arrayType.getComponentType,
                 
ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo))
+
+            case map: MapRelDataType =>
+              val keyTypeInfo = FlinkTypeFactory.toTypeInfo(map.keyType)
+              val valueTypeInfo = FlinkTypeFactory.toTypeInfo(map.valueType)
+              val componentTypeInfo = Types.ROW(keyTypeInfo, valueTypeInfo)
+              val componentType = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+                .createTypeFromTypeInfo(componentTypeInfo, isNullable = true)
+
+              val explodeFunction = 
ExplodeFunctionUtil.explodeTableFuncFromType(map.typeInfo)
+              (componentType, explodeFunction)
+
             case mt: MultisetRelDataType =>
               (mt.getComponentType, 
ExplodeFunctionUtil.explodeTableFuncFromType(mt.typeInfo))
             case _ => throw new TableException(s"Unsupported UNNEST on type: 
${dataType.toString}")
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
index cfcaa84..07fa407 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
@@ -21,9 +21,12 @@ package org.apache.flink.table.plan.util
 import java.util
 
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, 
ObjectArrayTypeInfo}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
 
 abstract class ExplodeTableFunction[T] extends TableFunction[T] {
 
@@ -54,6 +57,14 @@ abstract class ExplodeTableFunction[T] extends 
TableFunction[T] {
   }
 }
 
+class MapExplodeTableFunc extends TableFunction[Row] {
+  def eval(map: util.Map[Object, Object]): Unit = {
+    map.asScala.foreach { case (key,value) =>
+      collect(Row.of(key, value))
+    }
+  }
+}
+
 class ObjectExplodeTableFunc extends ExplodeTableFunction[Object] {
   def eval(arr: Array[Object]): Unit = {
     collectArray(arr)
@@ -144,6 +155,8 @@ object ExplodeFunctionUtil {
 
       case mt: MultisetTypeInfo[_] => 
createTableFuncByType(mt.getElementTypeInfo)
 
+      case mt: MapTypeInfo[_,_] => new MapExplodeTableFunc
+
       case _ => throw new TableException("Unnesting of '" + ti.toString + "' 
is not supported.")
     }
   }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
index 5e9ab69..6246609 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
@@ -18,8 +18,11 @@
 
 package org.apache.flink.table.runtime.batch.sql
 
+import java.util
+
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
 import 
org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
@@ -490,6 +493,56 @@ class JoinITCase(
   }
 
   @Test
+  def testCrossWithUnnestForMap(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = BatchTableEnvironment.create(env, config)
+
+    val data = List(
+      Row.of(Int.box(1),
+        Long.box(11L), {
+          val map = new util.HashMap[String, String]()
+          map.put("a", "10")
+          map.put("b", "11")
+          map
+        }),
+      Row.of(Int.box(2),
+        Long.box(22L), {
+          val map = new util.HashMap[String, String]()
+          map.put("c", "20")
+          map
+        }),
+      Row.of(Int.box(3),
+        Long.box(33L), {
+          val map = new util.HashMap[String, String]()
+          map.put("d", "30")
+          map.put("e", "31")
+          map
+        })
+    )
+
+    implicit val typeInfo = Types.ROW(
+      fieldNames = Array("a", "b", "c"),
+      types = Array(Types.INT,
+        Types.LONG,
+        Types.MAP(Types.STRING, Types.STRING))
+    )
+    val table = tEnv.fromDataSet(env.fromCollection(data))
+    tEnv.registerTable("src", table)
+
+
+    val sqlQuery =
+      """
+        |SELECT a,b,v FROM src
+        |CROSS JOIN UNNEST(c) as f (k,v)
+      """.stripMargin
+    val result = tEnv.sqlQuery(sqlQuery)
+
+    val expected = List("1,11,10", "1,11,11", "2,22,20", "3,33,30", "3,33,31")
+    val results = result.collect().toList
+    assertEquals(expected.toString(), results.sortWith(_.toString < 
_.toString).toString())
+  }
+
+  @Test
   def testJoinWithUnnestOfTuple(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = BatchTableEnvironment.create(env, config)

Reply via email to