[ https://issues.apache.org/jira/browse/FLINK-8838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16478969#comment-16478969 ]
ASF GitHub Bot commented on FLINK-8838: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r188908643 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala --- @@ -18,74 +18,129 @@ package org.apache.flink.table.plan.util +import java.util + import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.functions.TableFunction +import scala.collection.JavaConverters._ + class ObjectExplodeTableFunc extends TableFunction[Object] { def eval(arr: Array[Object]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Object, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class FloatExplodeTableFunc extends TableFunction[Float] { def eval(arr: Array[Float]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Float, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class ShortExplodeTableFunc extends TableFunction[Short] { def eval(arr: Array[Short]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Short, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class IntExplodeTableFunc extends TableFunction[Int] { def eval(arr: Array[Int]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Int, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class LongExplodeTableFunc extends TableFunction[Long] { def eval(arr: Array[Long]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Long, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class DoubleExplodeTableFunc extends TableFunction[Double] { def eval(arr: Array[Double]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Double, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class ByteExplodeTableFunc extends TableFunction[Byte] { def eval(arr: Array[Byte]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Byte, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class BooleanExplodeTableFunc extends TableFunction[Boolean] { def eval(arr: Array[Boolean]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Boolean, Integer]): Unit = { + CommonCollect.collect(map, collect) + } +} + +object CommonCollect { + def collect[T](map: util.Map[T, Integer], collectFunc: (T) => Unit): Unit = { + map.asScala.foreach{ e => + for (i <- 0 until e._2) { + collectFunc(e._1) + } + } + } } object ExplodeFunctionUtil { - def explodeTableFuncFromType(ti: TypeInformation[_]):TableFunction[_] = { + def explodeTableFuncFromType(ti: TypeInformation[_]): TableFunction[_] = { ti match { - case pat: PrimitiveArrayTypeInfo[_] => { - pat.getComponentType match { - case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc - case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc - case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc - case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc - case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc - case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc - case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc - } - } + case pat: PrimitiveArrayTypeInfo[_] => createTableFuncByType(pat.getComponentType) + case _: ObjectArrayTypeInfo[_, _] => new ObjectExplodeTableFunc + case _: BasicArrayTypeInfo[_, _] => new ObjectExplodeTableFunc - case _ => throw new UnsupportedOperationException(ti.toString + "IS NOT supported") + + case mt: MultisetTypeInfo[_] => createTableFuncByType(mt.getElementTypeInfo) + + case _ => throw new UnsupportedOperationException(ti.toString + " IS NOT supported") + } + } + + def createTableFuncByType(typeInfo: TypeInformation[_]): TableFunction[_] = { + typeInfo match { + case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc + case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc + case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc + case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc + case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc + case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc + case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc + case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc --- End diff -- Duplicate. > Add Support for UNNEST a MultiSet type field > -------------------------------------------- > > Key: FLINK-8838 > URL: https://issues.apache.org/jira/browse/FLINK-8838 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: lincoln.lee > Assignee: lincoln.lee > Priority: Major > > MultiSetTypeInfo was introduced by FLINK-7491, and UNNEST support Array type > only, so it would be nice to support UNNEST a MultiSet type field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)