Hi there, I cannot figure out how the Scala base types (e.g. scala.Int, scala.Double, etc.) are mapped to the Flink runtime.
It seems that there are not treated the same as their Java counterparts (e.g. java.lang.Integer, java.lang.Double). For example, if I write the following code: val inputFormat = new CsvInputFormat[FlinkTuple3[Int, Int, String]](inPath, "\n", '\t', classOf[Int], classOf[Int], classOf[String]) val typeInformation = new fjava.typeutils.TupleTypeInfo[FlinkTuple3[Int, Int, String]]( BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) env.createInput(inputFormat, typeInformation) I get the following error: Exception in thread "main" java.lang.IllegalArgumentException: The type 'int' is not supported for the CSV input format. at org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:158) at org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:133) at org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:83) ... I couldn't really also find pre-defined TypeInformation implementations for these basic Scala types (similar to what we have in BasicTypeInfo) or macro code that synthesized those. Can somebody elaborate on that? Thanks, A.