[
https://issues.apache.org/jira/browse/FLINK-32593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743339#comment-17743339
]
Zhaofu Liu commented on FLINK-32593:
------------------------------------
One fix for this is to adjust the #open(FileInputSplit split) method in the
file of DelimitedInputFormat.java.
Current method implementation:
{code:java}
// code placeholder
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
initBuffers();
this.offset = splitStart;
if (this.splitStart != 0) {
this.stream.seek(offset);
readLine();
// if the first partial record already pushes the stream over
// the limit of our split, then no record starts within this split
if (this.overLimit) {
this.end = true;
}
} else {
fillBuffer(0);
}
initializeSplit(split, null);
} {code}
Adjusted method implementation:
{code:java}
// code placeholder
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
// The main idea is to include the seperated delimit part in the tail of last
split to the head of the next split
if (split.getStart() > 0) {
int delimLength = this.delimiter.length;
int splitHeadLength = 2 * (delimLength - 1);
byte[] splitHead = new byte[splitHeadLength];
this.stream.seek(split.getStart() - delimLength + 1);
this.stream.read(splitHead, 0, splitHeadLength);
int delimPos = 0;
int searchPos = 0;
while ((searchPos < splitHeadLength && delimPos < delimLength)) {
if (splitHead[searchPos] == this.delimiter[delimPos]) {
// Found the expected delimiter character. Continue looking for the next
// character of delimiter.
delimPos++;
} else {
// Delimiter does not match.
// We have to reset the read position to the character after the first
matching
// character
// and search for the whole delimiter again.
searchPos -= delimPos;
delimPos = 0;
}
searchPos++;
}
if (delimPos == delimLength) {
FileInputSplit extendedSplit = new FileInputSplit(split.getSplitNumber(),
split.getPath(),
split.getStart() - delimLength + 1, split.getLength() + delimLength -
1, split.getHostnames());
super.open(extendedSplit);
}
}
initBuffers();
this.offset = splitStart;
if (this.splitStart != 0) {
this.stream.seek(offset);
readLine();
// if the first partial record already pushes the stream over
// the limit of our split, then no record starts within this split
if (this.overLimit) {
this.end = true;
}
} else {
fillBuffer(0);
}
initializeSplit(split, null);
} {code}
> DelimitedInputFormat will cause record loss for multi-bytes delimit when a
> delimit is seperated to two splits
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-32593
> URL: https://issues.apache.org/jira/browse/FLINK-32593
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Affects Versions: 1.16.1, 1.16.2, 1.17.1
> Reporter: Zhaofu Liu
> Priority: Major
> Attachments: 5parallel.dat, image-2023-07-15-10-30-03-740.png
>
>
> Run the following test to reproduce this bug.
> {code:java}
> // code placeholder
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.io.DelimitedInputFormat;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.junit.Test;
> import javax.xml.bind.DatatypeConverter;
> import java.io.IOException;
> public class MyTest {
> @Test
> public void myTest() throws Exception {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(5);
> String path =
> MyTest.class.getClassLoader().getResource("5parallel.dat").getPath();
> final DelimitedInputFormat<byte[]> inputFormat = new TestInputFormat();
> // The delimiter is "B87E7E7E"
> inputFormat.setDelimiter(new byte[]{(byte) 184, (byte) 126, (byte) 126,
> (byte) 126});
> // Set buffer size less than default value of 1M for easily debugging
> inputFormat.setBufferSize(128);
> DataStreamSource<byte[]> source = env.readFile(inputFormat, path);
> source.map(new MapFunction<byte[], Object>() {
> @Override
> public Object map(byte[] value) throws Exception {
> System.out.println(DatatypeConverter.printHexBinary(value));
> return value;
> }
> }).setParallelism(1);
> env.execute();
> }
> private class TestInputFormat extends DelimitedInputFormat<byte[]> {
> @Override
> public byte[] readRecord(byte[] reuse, byte[] bytes, int offset, int
> numBytes) throws IOException {
> final int delimiterLen = this.getDelimiter().length;
> if (numBytes > 0) {
> byte[] record = new byte[delimiterLen + numBytes];
> System.arraycopy(this.getDelimiter(), 0, record, 0, delimiterLen);
> System.arraycopy(bytes, offset, record, delimiterLen, numBytes);
> return record;
> }
> return new byte[0];
> }
> }
> }
> {code}
>
> The actually output result is:
> {code:java}
> // code placeholder
> B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99
> B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181
> B87E7E7E1A00EB900A4EDC6D5516 {code}
>
> The expected output result shoud be:
> {code:java}
> // code placeholder
> B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99
> B87E7E7E1A00EB900A4EDC6B52150070F6BE468EFD20BEEEB756E03FD7F653D0
> B87E7E7E1A00EB900A4EDC6D5516
> B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181 {code}
> The view of a delimit is seperated to two splits (The tail of line 2 and head
> of line 3):
> !image-2023-07-15-10-30-03-740.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)