Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5619#discussion_r188906074 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala --- @@ -18,74 +18,129 @@ package org.apache.flink.table.plan.util +import java.util + import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.functions.TableFunction +import scala.collection.JavaConverters._ + class ObjectExplodeTableFunc extends TableFunction[Object] { def eval(arr: Array[Object]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Object, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class FloatExplodeTableFunc extends TableFunction[Float] { def eval(arr: Array[Float]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Float, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class ShortExplodeTableFunc extends TableFunction[Short] { def eval(arr: Array[Short]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Short, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class IntExplodeTableFunc extends TableFunction[Int] { def eval(arr: Array[Int]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Int, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class LongExplodeTableFunc extends TableFunction[Long] { def eval(arr: Array[Long]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Long, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class DoubleExplodeTableFunc extends TableFunction[Double] { def eval(arr: Array[Double]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Double, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class ByteExplodeTableFunc extends TableFunction[Byte] { def eval(arr: Array[Byte]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Byte, Integer]): Unit = { + CommonCollect.collect(map, collect) + } } class BooleanExplodeTableFunc extends TableFunction[Boolean] { def eval(arr: Array[Boolean]): Unit = { arr.foreach(collect) } + + def eval(map: util.Map[Boolean, Integer]): Unit = { + CommonCollect.collect(map, collect) + } +} + +object CommonCollect { + def collect[T](map: util.Map[T, Integer], collectFunc: (T) => Unit): Unit = { + map.asScala.foreach{ e => --- End diff -- We should not use any Scala magic in runtime code. Can you convert it to a plain `while` loop? Would be great if you could also convert the `foreach` loops in the other methods of this class.
---