*Hi everyone, *
*Can anyone please confirm that multi readers are not supported when
reading from CSV source files using filesystem connector. I have tried
several methodologies but Job Manager always creates a single reader which
becomes a bottleneck.Issue summary:Flink provides filesystem connectors to
support CSV file sources in both batch and streaming modes. However, when
reading a CSV file, Flink uses a single reader to read the data in. A
single reader for large CSV files becomes a bottleneck for Flink apps
performance. In order to deliver better performance in Flink applications,
Job Manager should create InputSplits for CSV files using filesystem
connector. InputSplits creates virtual chunks on top of a single file by
dividing a file using [offset, length]. A sample file with size of
759863287 bytes can be splitted into 4 InputSplit like the
following:InputSplit 0/4, start, end: [0, 189965822]InputSplit 1/4,
start, end: [189965822, 379931644]InputSplit 2/4, start, end: [379931644,
569897466]InputSplit 3/4, start, end: [569897466, 759863287]Flink needs to
add support for CSV file chunking using InputSplits to improve performance
using multiple readers.Sample App:/*./bin/flink run flink-test-0.1.jar
--input /path/to/source --output /path/to/output */public class
FlinkCSVTestJob { public static void main(String[] args) throws Exception
{ // Read params ParameterTool parameterTool =
ParameterTool.fromPropertiesFile(
Thread.currentThread().getContextClassLoader().getResourceAsStream("flink-conf.yaml"))
.mergeWith(ParameterTool.fromArgs(args))
.mergeWith(ParameterTool.fromSystemProperties());
// Create Execution Env final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(4);
// Create Table Env StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env); String path =
parameterTool.get("input"); // Create table schema Schema schema =
Schema.newBuilder() .column("l_orderkey",
DataTypes.INT()) .column("l_quantity",
DataTypes.DOUBLE()) .column("l_extendedprice",
DataTypes.DOUBLE()) .column("l_discount",
DataTypes.DOUBLE()) .column("l_tax",
DataTypes.DOUBLE()) .build();// This creates a temp table and fills
the table with data// from path using filesystem
connector tableEnv.createTemporaryTable("LineItem",
TableDescriptor.forConnector("filesystem")
.schema(schema).option("path",
path).format(FormatDescriptor.forFormat("csv").option("field-delimiter",
"|").build()).build());// Execute a query on LineItem table, which returs a
table as the result Table table = tableEnv.sqlQuery("SELECT * FROM
LineItem");// Convert result table to DataStream to dump the result to
disk DataStream<LineItemOutput> stream = tableEnv.toDataStream(table,
LineItemOutput.class);// Dump the stream to the output
destination stream.sinkTo(FileSink.forRowFormat(new
Path(parameterTool.get("output")), new
SimpleStringEncoder<LineItemOutput>()).build())
.setParallelism(4).name("FlinkTestJob"); env.execute(); }}*
Yasin Celik