Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5619#discussion_r188908643
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
 ---
    @@ -18,74 +18,129 @@
     
     package org.apache.flink.table.plan.util
     
    +import java.util
    +
     import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
    -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
    +import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, 
ObjectArrayTypeInfo}
     import org.apache.flink.table.functions.TableFunction
     
    +import scala.collection.JavaConverters._
    +
     class ObjectExplodeTableFunc extends TableFunction[Object] {
       def eval(arr: Array[Object]): Unit = {
         arr.foreach(collect)
       }
    +
    +  def eval(map: util.Map[Object, Integer]): Unit = {
    +    CommonCollect.collect(map, collect)
    +  }
     }
     
     class FloatExplodeTableFunc extends TableFunction[Float] {
       def eval(arr: Array[Float]): Unit = {
         arr.foreach(collect)
       }
    +
    +  def eval(map: util.Map[Float, Integer]): Unit = {
    +    CommonCollect.collect(map, collect)
    +  }
     }
     
     class ShortExplodeTableFunc extends TableFunction[Short] {
       def eval(arr: Array[Short]): Unit = {
         arr.foreach(collect)
       }
    +
    +  def eval(map: util.Map[Short, Integer]): Unit = {
    +    CommonCollect.collect(map, collect)
    +  }
     }
     class IntExplodeTableFunc extends TableFunction[Int] {
       def eval(arr: Array[Int]): Unit = {
         arr.foreach(collect)
       }
    +
    +  def eval(map: util.Map[Int, Integer]): Unit = {
    +    CommonCollect.collect(map, collect)
    +  }
     }
     
     class LongExplodeTableFunc extends TableFunction[Long] {
       def eval(arr: Array[Long]): Unit = {
         arr.foreach(collect)
       }
    +
    +  def eval(map: util.Map[Long, Integer]): Unit = {
    +    CommonCollect.collect(map, collect)
    +  }
     }
     
     class DoubleExplodeTableFunc extends TableFunction[Double] {
       def eval(arr: Array[Double]): Unit = {
         arr.foreach(collect)
       }
    +
    +  def eval(map: util.Map[Double, Integer]): Unit = {
    +    CommonCollect.collect(map, collect)
    +  }
     }
     
     class ByteExplodeTableFunc extends TableFunction[Byte] {
       def eval(arr: Array[Byte]): Unit = {
         arr.foreach(collect)
       }
    +
    +  def eval(map: util.Map[Byte, Integer]): Unit = {
    +    CommonCollect.collect(map, collect)
    +  }
     }
     
     class BooleanExplodeTableFunc extends TableFunction[Boolean] {
       def eval(arr: Array[Boolean]): Unit = {
         arr.foreach(collect)
       }
    +
    +  def eval(map: util.Map[Boolean, Integer]): Unit = {
    +    CommonCollect.collect(map, collect)
    +  }
    +}
    +
    +object CommonCollect {
    +  def collect[T](map: util.Map[T, Integer], collectFunc: (T) => Unit): 
Unit = {
    +    map.asScala.foreach{ e =>
    +      for (i <- 0 until e._2) {
    +        collectFunc(e._1)
    +      }
    +    }
    +  }
     }
     
     object ExplodeFunctionUtil {
    -  def explodeTableFuncFromType(ti: TypeInformation[_]):TableFunction[_] = {
    +  def explodeTableFuncFromType(ti: TypeInformation[_]): TableFunction[_] = 
{
         ti match {
    -      case pat: PrimitiveArrayTypeInfo[_] => {
    -        pat.getComponentType match {
    -          case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc
    -          case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc
    -          case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc
    -          case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc
    -          case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc
    -          case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc
    -          case BasicTypeInfo.BOOLEAN_TYPE_INFO => new 
BooleanExplodeTableFunc
    -        }
    -      }
    +      case pat: PrimitiveArrayTypeInfo[_] => 
createTableFuncByType(pat.getComponentType)
    +
           case _: ObjectArrayTypeInfo[_, _] => new ObjectExplodeTableFunc
    +
           case _: BasicArrayTypeInfo[_, _] => new ObjectExplodeTableFunc
    -      case _ => throw new UnsupportedOperationException(ti.toString + "IS 
NOT supported")
    +
    +      case mt: MultisetTypeInfo[_] => 
createTableFuncByType(mt.getElementTypeInfo)
    +
    +      case _ => throw new UnsupportedOperationException(ti.toString + " IS 
NOT supported")
    +    }
    +  }
    +
    +  def createTableFuncByType(typeInfo: TypeInformation[_]): 
TableFunction[_] = {
    +    typeInfo match {
    +      case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc
    +      case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc
    +      case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc
    +      case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc
    +      case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc
    +      case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc
    +      case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc
    +      case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc
    --- End diff --
    
    Duplicate.


---

Reply via email to