Hi,

I wanted to spend some time on this idea I asked about a year ago.

So I had a look at the actual code and found this where the file splits are
calculated


https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L601

What I realized is that apparently the current system for determining the
FileInputSplit s
1) Lists  the collection of input files.
2) If a file is NOT splittable set a global flag to false.
3) ONLY if all of them are splittable then will they be split.

So (if I understand correctly) if I have an input directory with a 100GB
uncompressed text file and a 1 KB gzipped file then BOTH will NOT be split.
Why is that done this way?

I also found that the determination if a file is NOT splittable completely
rests on the fact that the decompression that is to be used is to be
provided by an InflaterInputStreamFactory.
Which is funny because the BZIP2 as a compression IS splittable but
according to this implementation it isn't.

I also noticed that the Hadoop split calculation is controlled by a split
size, the Flink implementation is controlled by the number of splits.
This seems logical to me as in Hadoop (MapReduce) the number of tasks
running is dynamic. In contrast with Flink where the number of parallel
tasks is a given setting.

Looking at how Hadoop does this I see that the FileInputFormat has a
method isSplitable
(yes with typo, and with a terribly dangerous default value: MAPREDUCE-2094)
which gives the answer on a per file basis.
Some file formats (like TextInputFormat) look at the codec that was found
to see if it implements the SplittableCompressionCodec interface to
determine if the file is splittable.
Other file formats (like Avro and Parquet) use something else to determine
this.

Overall I currently think the way this has been implemented in Flink is not
very good.

However looking at the gap to bridge to make it similar to what Hadoop has
seems like a huge step.

I expect that many classes and interfaces will need to change dramatically
to make this happen.

My main question to you guys: Do you think it is worth it?

Niels Basjes


On Mon, Feb 19, 2018 at 10:50 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Niels,
>
> Jörn is right, although offering different methods, Flink's InputFormat is
> very similar to Hadoop's InputFormat interface.
> The InputFormat.createInputSplits() method generates splits that can be
> read in parallel.
> The FileInputFormat splits files by fixed boundaries (usually HDFS
> blocksize) and expects the InputFormat to find the right place to start and
> end.
> For line-wise read files (TextInputFormat) or files with a record delimiter
> (DelimiterInputFormat), the formats read the first record after they found
> the first delimiter in their split and stop at the first delimiter after
> the split boundary.
> The BinaryInputFormat extends FileInputFormat but overrides the
> createInputSplits method.
>
> So, how exactly a file is read in parallel depends on the
> createInputSplits() method of the InputFormat.
>
> Hope this helps,
> Fabian
>
>
> 2018-02-18 13:36 GMT+01:00 Jörn Franke <jornfra...@gmail.com>:
>
> > AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore you
> > can set for custom Fileibputformats the attribute unsplittable = true if
> > your file format cannot be split
> >
> > > On 18. Feb 2018, at 13:28, Niels Basjes <ni...@basjes.nl> wrote:
> > >
> > > Hi,
> > >
> > > In Hadoop MapReduce there is the notion of "splittable" in the
> > > FileInputFormat. This has the effect that a single input file can be
> fed
> > > into multiple separate instances of the mapper that read the data.
> > > A lot has been documented (i.e. text is splittable per line, gzipped
> text
> > > is not splittable) and designed into the various file formats (like
> Avro
> > > and Parquet) to allow splittability.
> > >
> > > The goal is that reading and parsing files can be done by multiple
> > > cpus/systems in parallel.
> > >
> > > How is this handled in Flink?
> > > Can Flink read a single file in parallel?
> > > How does Flink administrate/handle the possibilities regarding the
> > various
> > > file formats?
> > >
> > >
> > > The reason I ask is because I want to see if I can port this (now
> Hadoop
> > > specific) hobby project of mine to work with Flink:
> > > https://github.com/nielsbasjes/splittablegzip
> > >
> > > Thanks.
> > >
> > > --
> > > Best regards / Met vriendelijke groeten,
> > >
> > > Niels Basjes
> >
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to