Hi there,

I cannot figure out how the Scala base types (e.g. scala.Int, scala.Double,
etc.) are mapped to the Flink runtime.

It seems that there are not treated the same as their Java counterparts
(e.g. java.lang.Integer, java.lang.Double). For example, if I write the
following code:

val inputFormat = new CsvInputFormat[FlinkTuple3[Int, Int,
String]](inPath, "\n", '\t', classOf[Int], classOf[Int],
classOf[String])
val typeInformation = new
fjava.typeutils.TupleTypeInfo[FlinkTuple3[Int, Int, String]](
  BasicTypeInfo.INT_TYPE_INFO,
  BasicTypeInfo.INT_TYPE_INFO,
  BasicTypeInfo.STRING_TYPE_INFO)
env.createInput(inputFormat, typeInformation)

I get the following error:

Exception in thread "main" java.lang.IllegalArgumentException: The type
'int' is not supported for the CSV input format.
    at
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:158)
    at
org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:133)
    at
org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:83)
    ...

I couldn't really also find pre-defined TypeInformation implementations for
these basic Scala types (similar to what we have in BasicTypeInfo) or macro
code that synthesized those.

Can somebody elaborate on that?

Thanks,
A.

Reply via email to