*Hi everyone, *
*Can anyone please confirm that multi readers are not supported when reading from CSV source files using filesystem connector. I have tried several methodologies but Job Manager always creates a single reader which becomes a bottleneck.Issue summary:Flink provides filesystem connectors to support CSV file sources in both batch and streaming modes. However, when reading a CSV file, Flink uses a single reader to read the data in. A single reader for large CSV files becomes a bottleneck for Flink apps performance. In order to deliver better performance in Flink applications, Job Manager should create InputSplits for CSV files using filesystem connector. InputSplits creates virtual chunks on top of a single file by dividing a file using [offset, length]. A sample file with size of 759863287 bytes can be splitted into 4 InputSplit like the following:InputSplit 0/4, start, end: [0, 189965822]InputSplit 1/4, start, end: [189965822, 379931644]InputSplit 2/4, start, end: [379931644, 569897466]InputSplit 3/4, start, end: [569897466, 759863287]Flink needs to add support for CSV file chunking using InputSplits to improve performance using multiple readers.Sample App:/*./bin/flink run flink-test-0.1.jar --input /path/to/source --output /path/to/output */public class FlinkCSVTestJob { public static void main(String[] args) throws Exception { // Read params ParameterTool parameterTool = ParameterTool.fromPropertiesFile( Thread.currentThread().getContextClassLoader().getResourceAsStream("flink-conf.yaml")) .mergeWith(ParameterTool.fromArgs(args)) .mergeWith(ParameterTool.fromSystemProperties()); // Create Execution Env final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.setParallelism(4); // Create Table Env StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String path = parameterTool.get("input"); // Create table schema Schema schema = Schema.newBuilder() .column("l_orderkey", DataTypes.INT()) .column("l_quantity", DataTypes.DOUBLE()) .column("l_extendedprice", DataTypes.DOUBLE()) .column("l_discount", DataTypes.DOUBLE()) .column("l_tax", DataTypes.DOUBLE()) .build();// This creates a temp table and fills the table with data// from path using filesystem connector tableEnv.createTemporaryTable("LineItem", TableDescriptor.forConnector("filesystem") .schema(schema).option("path", path).format(FormatDescriptor.forFormat("csv").option("field-delimiter", "|").build()).build());// Execute a query on LineItem table, which returs a table as the result Table table = tableEnv.sqlQuery("SELECT * FROM LineItem");// Convert result table to DataStream to dump the result to disk DataStream<LineItemOutput> stream = tableEnv.toDataStream(table, LineItemOutput.class);// Dump the stream to the output destination stream.sinkTo(FileSink.forRowFormat(new Path(parameterTool.get("output")), new SimpleStringEncoder<LineItemOutput>()).build()) .setParallelism(4).name("FlinkTestJob"); env.execute(); }}* Yasin Celik