This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2c39024 [FLINK-12200][table-planner] Support UNNEST for MAP types
2c39024 is described below
commit 2c390243739dcbf4a4abe93765b5b2064f6e26c7
Author: Artsem Semianenka <[email protected]>
AuthorDate: Fri Apr 26 16:36:17 2019 +0300
[FLINK-12200][table-planner] Support UNNEST for MAP types
This closes #8179
---
.../plan/rules/logical/LogicalUnnestRule.scala | 14 +++++-
.../table/plan/util/ExplodeFunctionUtil.scala | 15 +++++-
.../flink/table/runtime/batch/sql/JoinITCase.scala | 53 ++++++++++++++++++++++
3 files changed, 80 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
index 09886e0..87dc579 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
@@ -29,10 +29,11 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.Uncollect
import org.apache.calcite.rel.logical._
import org.apache.calcite.sql.`type`.AbstractSqlType
+import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.table.api.TableException
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-import org.apache.flink.table.plan.schema.{ArrayRelDataType,
MultisetRelDataType}
+import org.apache.flink.table.plan.schema.{ArrayRelDataType, MapRelDataType,
MultisetRelDataType}
import org.apache.flink.table.plan.util.ExplodeFunctionUtil
class LogicalUnnestRule(
@@ -94,6 +95,17 @@ class LogicalUnnestRule(
case arrayType: ArrayRelDataType =>
(arrayType.getComponentType,
ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo))
+
+ case map: MapRelDataType =>
+ val keyTypeInfo = FlinkTypeFactory.toTypeInfo(map.keyType)
+ val valueTypeInfo = FlinkTypeFactory.toTypeInfo(map.valueType)
+ val componentTypeInfo = Types.ROW(keyTypeInfo, valueTypeInfo)
+ val componentType =
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ .createTypeFromTypeInfo(componentTypeInfo, isNullable = true)
+
+ val explodeFunction =
ExplodeFunctionUtil.explodeTableFuncFromType(map.typeInfo)
+ (componentType, explodeFunction)
+
case mt: MultisetRelDataType =>
(mt.getComponentType,
ExplodeFunctionUtil.explodeTableFuncFromType(mt.typeInfo))
case _ => throw new TableException(s"Unsupported UNNEST on type:
${dataType.toString}")
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
index cfcaa84..07fa407 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
@@ -21,9 +21,12 @@ package org.apache.flink.table.plan.util
import java.util
import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo,
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.{MultisetTypeInfo,
ObjectArrayTypeInfo}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo,
ObjectArrayTypeInfo}
import org.apache.flink.table.api.TableException
import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
abstract class ExplodeTableFunction[T] extends TableFunction[T] {
@@ -54,6 +57,14 @@ abstract class ExplodeTableFunction[T] extends
TableFunction[T] {
}
}
+class MapExplodeTableFunc extends TableFunction[Row] {
+ def eval(map: util.Map[Object, Object]): Unit = {
+ map.asScala.foreach { case (key,value) =>
+ collect(Row.of(key, value))
+ }
+ }
+}
+
class ObjectExplodeTableFunc extends ExplodeTableFunction[Object] {
def eval(arr: Array[Object]): Unit = {
collectArray(arr)
@@ -144,6 +155,8 @@ object ExplodeFunctionUtil {
case mt: MultisetTypeInfo[_] =>
createTableFuncByType(mt.getElementTypeInfo)
+ case mt: MapTypeInfo[_,_] => new MapExplodeTableFunc
+
case _ => throw new TableException("Unnesting of '" + ti.toString + "'
is not supported.")
}
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
index 5e9ab69..6246609 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
@@ -18,8 +18,11 @@
package org.apache.flink.table.runtime.batch.sql
+import java.util
+
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
import
org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
@@ -490,6 +493,56 @@ class JoinITCase(
}
@Test
+ def testCrossWithUnnestForMap(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = BatchTableEnvironment.create(env, config)
+
+ val data = List(
+ Row.of(Int.box(1),
+ Long.box(11L), {
+ val map = new util.HashMap[String, String]()
+ map.put("a", "10")
+ map.put("b", "11")
+ map
+ }),
+ Row.of(Int.box(2),
+ Long.box(22L), {
+ val map = new util.HashMap[String, String]()
+ map.put("c", "20")
+ map
+ }),
+ Row.of(Int.box(3),
+ Long.box(33L), {
+ val map = new util.HashMap[String, String]()
+ map.put("d", "30")
+ map.put("e", "31")
+ map
+ })
+ )
+
+ implicit val typeInfo = Types.ROW(
+ fieldNames = Array("a", "b", "c"),
+ types = Array(Types.INT,
+ Types.LONG,
+ Types.MAP(Types.STRING, Types.STRING))
+ )
+ val table = tEnv.fromDataSet(env.fromCollection(data))
+ tEnv.registerTable("src", table)
+
+
+ val sqlQuery =
+ """
+ |SELECT a,b,v FROM src
+ |CROSS JOIN UNNEST(c) as f (k,v)
+ """.stripMargin
+ val result = tEnv.sqlQuery(sqlQuery)
+
+ val expected = List("1,11,10", "1,11,11", "2,22,20", "3,33,30", "3,33,31")
+ val results = result.collect().toList
+ assertEquals(expected.toString(), results.sortWith(_.toString <
_.toString).toString())
+ }
+
+ @Test
def testJoinWithUnnestOfTuple(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env, config)