*Hi everyone, *














*Can anyone please confirm that multi readers are not supported when
reading from CSV source files using filesystem connector. I have tried
several methodologies but Job Manager always creates a single reader which
becomes a bottleneck.Issue summary:Flink provides filesystem connectors to
support CSV file sources in both batch and streaming modes. However, when
reading a CSV file, Flink uses a single reader to read the data in. A
single reader for large CSV files becomes a bottleneck for Flink apps
performance. In order to deliver better performance in Flink applications,
Job Manager should create InputSplits for CSV files using filesystem
connector. InputSplits creates virtual chunks on top of a single file by
dividing a file using [offset, length]. A sample file with size of
759863287 bytes can be splitted into 4 InputSplit like the
following:InputSplit 0/4,  start, end: [0, 189965822]InputSplit 1/4,
start, end: [189965822, 379931644]InputSplit 2/4,  start, end: [379931644,
569897466]InputSplit 3/4,  start, end: [569897466, 759863287]Flink needs to
add support for CSV file chunking using InputSplits to improve performance
using multiple readers.Sample App:/*./bin/flink run flink-test-0.1.jar
--input /path/to/source --output /path/to/output */public class
FlinkCSVTestJob {  public static void main(String[] args) throws Exception
{ // Read params    ParameterTool parameterTool =
ParameterTool.fromPropertiesFile(
Thread.currentThread().getContextClassLoader().getResourceAsStream("flink-conf.yaml"))
       .mergeWith(ParameterTool.fromArgs(args))
.mergeWith(ParameterTool.fromSystemProperties());
// Create Execution Env    final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(4);
// Create Table Env    StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);    String path =
parameterTool.get("input"); // Create table schema     Schema schema =
Schema.newBuilder()        .column("l_orderkey",
DataTypes.INT())        .column("l_quantity",
DataTypes.DOUBLE())        .column("l_extendedprice",
DataTypes.DOUBLE())        .column("l_discount",
DataTypes.DOUBLE())        .column("l_tax",
DataTypes.DOUBLE())        .build();// This creates a temp table and fills
the table with data// from path using filesystem
connector    tableEnv.createTemporaryTable("LineItem",
TableDescriptor.forConnector("filesystem")
.schema(schema).option("path",
path).format(FormatDescriptor.forFormat("csv").option("field-delimiter",
"|").build()).build());// Execute a query on LineItem table, which returs a
table as the result    Table table = tableEnv.sqlQuery("SELECT * FROM
LineItem");// Convert result table to DataStream to dump the result to
disk    DataStream<LineItemOutput> stream = tableEnv.toDataStream(table,
LineItemOutput.class);// Dump the stream to the output
destination    stream.sinkTo(FileSink.forRowFormat(new
Path(parameterTool.get("output")), new
SimpleStringEncoder<LineItemOutput>()).build())
.setParallelism(4).name("FlinkTestJob");    env.execute();  }}*
Yasin Celik

Reply via email to