[ 
https://issues.apache.org/jira/browse/FLINK-32593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743339#comment-17743339
 ] 

Zhaofu Liu commented on FLINK-32593:
------------------------------------

One fix for this is to adjust the #open(FileInputSplit split) method in the 
file of DelimitedInputFormat.java.

 

Current method implementation:
{code:java}
// code placeholder
@Override
public void open(FileInputSplit split) throws IOException {
    super.open(split);
    initBuffers();

    this.offset = splitStart;
    if (this.splitStart != 0) {
        this.stream.seek(offset);
        readLine();
        // if the first partial record already pushes the stream over
        // the limit of our split, then no record starts within this split
        if (this.overLimit) {
            this.end = true;
        }
    } else {
        fillBuffer(0);
    }
    initializeSplit(split, null);
} {code}
Adjusted method implementation:
{code:java}
// code placeholder
@Override
public void open(FileInputSplit split) throws IOException {
  super.open(split);

  // The main idea is to include the seperated delimit part in the tail of last 
split to the head of the next split
  if (split.getStart() > 0) {
    int delimLength = this.delimiter.length;
    int splitHeadLength = 2 * (delimLength - 1);

    byte[] splitHead = new byte[splitHeadLength];

    this.stream.seek(split.getStart() - delimLength + 1);
    this.stream.read(splitHead, 0, splitHeadLength);

    int delimPos = 0;
    int searchPos = 0;
    while ((searchPos < splitHeadLength && delimPos < delimLength)) {
      if (splitHead[searchPos] == this.delimiter[delimPos]) {
        // Found the expected delimiter character. Continue looking for the next
        // character of delimiter.
        delimPos++;
      } else {
        // Delimiter does not match.
        // We have to reset the read position to the character after the first 
matching
        // character
        //   and search for the whole delimiter again.
        searchPos -= delimPos;
        delimPos = 0;
      }
      searchPos++;
    }

    if (delimPos == delimLength) {
      FileInputSplit extendedSplit = new FileInputSplit(split.getSplitNumber(), 
split.getPath(),
          split.getStart() - delimLength + 1, split.getLength() + delimLength - 
1, split.getHostnames());
      super.open(extendedSplit);
    }
  }

  initBuffers();

  this.offset = splitStart;
  if (this.splitStart != 0) {
    this.stream.seek(offset);
    readLine();
    // if the first partial record already pushes the stream over
    // the limit of our split, then no record starts within this split
    if (this.overLimit) {
      this.end = true;
    }
  } else {
    fillBuffer(0);
  }
  initializeSplit(split, null);
} {code}

> DelimitedInputFormat will cause record loss for multi-bytes delimit when a 
> delimit is seperated to two splits
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32593
>                 URL: https://issues.apache.org/jira/browse/FLINK-32593
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.16.1, 1.16.2, 1.17.1
>            Reporter: Zhaofu Liu
>            Priority: Major
>         Attachments: 5parallel.dat, image-2023-07-15-10-30-03-740.png
>
>
> Run the following test to reproduce this bug.
> {code:java}
> // code placeholder
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.io.DelimitedInputFormat;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.junit.Test;
> import javax.xml.bind.DatatypeConverter;
> import java.io.IOException;
> public class MyTest {
>   @Test
>   public void myTest() throws Exception {
>     final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(5);
>     String path = 
> MyTest.class.getClassLoader().getResource("5parallel.dat").getPath();
>     final DelimitedInputFormat<byte[]> inputFormat = new TestInputFormat();
>     // The delimiter is "B87E7E7E"
>     inputFormat.setDelimiter(new byte[]{(byte) 184, (byte) 126, (byte) 126, 
> (byte) 126});
>     // Set buffer size less than default value of 1M for easily debugging
>     inputFormat.setBufferSize(128);
>     DataStreamSource<byte[]> source = env.readFile(inputFormat, path);
>     source.map(new MapFunction<byte[], Object>() {
>       @Override
>       public Object map(byte[] value) throws Exception {
>         System.out.println(DatatypeConverter.printHexBinary(value));
>         return value;
>       }
>     }).setParallelism(1);
>     env.execute();
>   }
>   private class TestInputFormat extends DelimitedInputFormat<byte[]> {
>     @Override
>     public byte[] readRecord(byte[] reuse, byte[] bytes, int offset, int 
> numBytes) throws IOException {
>       final int delimiterLen = this.getDelimiter().length;
>       if (numBytes > 0) {
>         byte[] record = new byte[delimiterLen + numBytes];
>         System.arraycopy(this.getDelimiter(), 0, record, 0, delimiterLen);
>         System.arraycopy(bytes, offset, record, delimiterLen, numBytes);
>         return record;
>       }
>       return new byte[0];
>     }
>   }
> }
>  {code}
>  
> The actually output result is:
> {code:java}
> // code placeholder
> B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99
> B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181
> B87E7E7E1A00EB900A4EDC6D5516 {code}
>  
> The expected output result shoud be:
> {code:java}
> // code placeholder
> B87E7E7E1A00EB900A4EDC6850160070F6BED4631321ADDC6F06DC137C221E99
> B87E7E7E1A00EB900A4EDC6B52150070F6BE468EFD20BEEEB756E03FD7F653D0
> B87E7E7E1A00EB900A4EDC6D5516
> B87E7E7E1A00EB900A4EDC6A51160070F61A8AFE022A3EC67718002A217C2181 {code}
> The view of a delimit is seperated to two splits (The tail of line 2 and head 
> of line 3):
> !image-2023-07-15-10-30-03-740.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to