[
https://issues.apache.org/jira/browse/FLINK-2503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Maximilian Michels updated FLINK-2503:
--------------------------------------
Description:
>From a thread in the user mailing list (Invalid argument reading a file
>containing a Kryo object).
I think that there are some inconsistencies in the hierarchy of InputFormats.
The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the
behaviour of the FileInputFormat (so respect unsplittable and
enumerateNestedFiles) while they doesn't take into account those flags.
Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit"
that maybe should be removed or fixed :)
Also maintaing aligned testForUnsplittable and decorateInputStream is somehow
dangerous..
And maybe visibility for getBlockIndexForPosition should be changed to
protected?
My need was to implement a TypeSerializerInputFormat<RowBundle> but to achieve
that I had to make a lot of overrides..am I doing something wrong or are those
inputFormat somehow to improve..? This is my IF code (remark: from the comment
"Copied from FileInputFormat (override TypeSerializerInputFormat)" on the code
is copied-and-pasted from FileInputFormat..thus MY code ends there):
{code:java}
public class RowBundleInputFormat extends TypeSerializerInputFormat<RowBundle> {
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(RowBundleInputFormat.class);
/** The fraction that the last split may be larger than the others. */
private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
private boolean objectRead;
public RowBundleInputFormat() {
super(new GenericTypeInfo<>(RowBundle.class));
unsplittable = true;
}
@Override
protected FSDataInputStream decorateInputStream(FSDataInputStream
inputStream, FileInputSplit fileSplit) throws Throwable {
return inputStream;
}
@Override
protected boolean testForUnsplittable(FileStatus pathFile) {
return true;
}
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
objectRead = false;
}
@Override
public boolean reachedEnd() throws IOException {
return this.objectRead;
}
@Override
public RowBundle nextRecord(RowBundle reuse) throws IOException {
RowBundle yourObject = super.nextRecord(reuse);
this.objectRead = true; // read only one object
return yourObject;
}
// -------------------------------------------------------------------
// Copied from FileInputFormat (overriding TypeSerializerInputFormat)
// -------------------------------------------------------------------
@Override
public FileInputSplit[] createInputSplits(int minNumSplits)
throws IOException {...}
private long addNestedFiles(Path path, List<FileStatus> files, long
length, boolean logExcludedFiles) throws IOException {...}
private int getBlockIndexForPosition(BlockLocation[] blocks, long
offset, long halfSplitSize, int startIndex) { ... }
}
{code}
was:
>From a thread in the user mailing list (Invalid argument reading a file
>containing a Kryo object).
I think that there are some inconsistencies in the hierarchy of InputFormats.
The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the
behaviour of the FileInputFormat (so respect unsplittable and
enumerateNestedFiles) while they doesn't take into account those flags.
Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit"
that maybe should be removed or fixed :)
Also maintaing aligned testForUnsplittable and decorateInputStream is somehow
dangerous..
And maybe visibility for getBlockIndexForPosition should be changed to
protected?
My need was to implement a TypeSerializerInputFormat<RowBundle> but to achieve
that I had to make a lot of overrides..am I doing something wrong or are those
inputFormat somehow to improve..? This is my IF code (remark: from the comment
"Copied from FileInputFormat (override TypeSerializerInputFormat)" on the code
is copied-and-pasted from FileInputFormat..thus MY code ends there):
public class RowBundleInputFormat extends TypeSerializerInputFormat<RowBundle> {
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(RowBundleInputFormat.class);
/** The fraction that the last split may be larger than the others. */
private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
private boolean objectRead;
public RowBundleInputFormat() {
super(new GenericTypeInfo<>(RowBundle.class));
unsplittable = true;
}
@Override
protected FSDataInputStream decorateInputStream(FSDataInputStream
inputStream, FileInputSplit fileSplit) throws Throwable {
return inputStream;
}
@Override
protected boolean testForUnsplittable(FileStatus pathFile) {
return true;
}
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
objectRead = false;
}
@Override
public boolean reachedEnd() throws IOException {
return this.objectRead;
}
@Override
public RowBundle nextRecord(RowBundle reuse) throws IOException {
RowBundle yourObject = super.nextRecord(reuse);
this.objectRead = true; // read only one object
return yourObject;
}
// -------------------------------------------------------------------
// Copied from FileInputFormat (overriding TypeSerializerInputFormat)
// -------------------------------------------------------------------
@Override
public FileInputSplit[] createInputSplits(int minNumSplits)
throws IOException {...}
private long addNestedFiles(Path path, List<FileStatus> files, long
length, boolean logExcludedFiles) throws IOException {...}
private int getBlockIndexForPosition(BlockLocation[] blocks, long
offset, long halfSplitSize, int startIndex) { ... }
}
> Inconsistencies in FileInputFormat hierarchy
> --------------------------------------------
>
> Key: FLINK-2503
> URL: https://issues.apache.org/jira/browse/FLINK-2503
> Project: Flink
> Issue Type: Bug
> Components: Core
> Affects Versions: master
> Reporter: Flavio Pompermaier
> Priority: Minor
>
> From a thread in the user mailing list (Invalid argument reading a file
> containing a Kryo object).
> I think that there are some inconsistencies in the hierarchy of InputFormats.
> The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the
> behaviour of the FileInputFormat (so respect unsplittable and
> enumerateNestedFiles) while they doesn't take into account those flags.
> Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit"
> that maybe should be removed or fixed :)
> Also maintaing aligned testForUnsplittable and decorateInputStream is somehow
> dangerous..
> And maybe visibility for getBlockIndexForPosition should be changed to
> protected?
> My need was to implement a TypeSerializerInputFormat<RowBundle> but to
> achieve that I had to make a lot of overrides..am I doing something wrong or
> are those inputFormat somehow to improve..? This is my IF code (remark: from
> the comment "Copied from FileInputFormat (override
> TypeSerializerInputFormat)" on the code is copied-and-pasted from
> FileInputFormat..thus MY code ends there):
> {code:java}
> public class RowBundleInputFormat extends
> TypeSerializerInputFormat<RowBundle> {
> private static final long serialVersionUID = 1L;
> private static final Logger LOG =
> LoggerFactory.getLogger(RowBundleInputFormat.class);
> /** The fraction that the last split may be larger than the others. */
> private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
> private boolean objectRead;
> public RowBundleInputFormat() {
> super(new GenericTypeInfo<>(RowBundle.class));
> unsplittable = true;
> }
> @Override
> protected FSDataInputStream decorateInputStream(FSDataInputStream
> inputStream, FileInputSplit fileSplit) throws Throwable {
> return inputStream;
> }
> @Override
> protected boolean testForUnsplittable(FileStatus pathFile) {
> return true;
> }
> @Override
> public void open(FileInputSplit split) throws IOException {
> super.open(split);
> objectRead = false;
> }
> @Override
> public boolean reachedEnd() throws IOException {
> return this.objectRead;
> }
> @Override
> public RowBundle nextRecord(RowBundle reuse) throws IOException {
> RowBundle yourObject = super.nextRecord(reuse);
> this.objectRead = true; // read only one object
> return yourObject;
> }
> // -------------------------------------------------------------------
> // Copied from FileInputFormat (overriding TypeSerializerInputFormat)
> // -------------------------------------------------------------------
> @Override
> public FileInputSplit[] createInputSplits(int minNumSplits)
> throws IOException {...}
> private long addNestedFiles(Path path, List<FileStatus> files, long
> length, boolean logExcludedFiles) throws IOException {...}
> private int getBlockIndexForPosition(BlockLocation[] blocks, long
> offset, long halfSplitSize, int startIndex) { ... }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)