This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c9a706b8388b324a37da43298e37074d0a452a34 Author: zhouli <[email protected]> AuthorDate: Tue Jun 14 15:40:46 2022 +0800 [FLINK-28018][core] correct the start index for creating empty splits in BinaryInputFormat#createInputSplits This closes #19946. --- .../flink/api/common/io/BinaryInputFormat.java | 2 +- .../flink/api/common/io/BinaryInputFormatTest.java | 62 ++++++++++++++++++---- 2 files changed, 53 insertions(+), 11 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java index 93bf43d72c8..75f4bb2a7fa 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java @@ -147,7 +147,7 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> FileStatus last = files.get(files.size() - 1); final BlockLocation[] blocks = last.getPath().getFileSystem().getFileBlockLocations(last, 0, last.getLen()); - for (int index = files.size(); index < minNumSplits; index++) { + for (int index = inputSplits.size(); index < minNumSplits; index++) { inputSplits.add( new FileInputSplit( index, last.getPath(), last.getLen(), 0, blocks[0].getHosts())); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java index 480995f1132..7de496606ae 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java @@ -24,7 +24,9 @@ import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.types.Record; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.FileOutputStream; @@ -35,6 +37,8 @@ import static org.assertj.core.api.Fail.fail; public class BinaryInputFormatTest { + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + private static final class MyBinaryInputFormat extends BinaryInputFormat<Record> { private static final long serialVersionUID = 1L; @@ -52,17 +56,14 @@ public class BinaryInputFormatTest { @Test public void testCreateInputSplitsWithOneFile() throws IOException { - // create temporary file with 3 blocks - final File tempFile = File.createTempFile("binary_input_format_test", "tmp"); - tempFile.deleteOnExit(); final int blockInfoSize = new BlockInfo().getInfoSize(); final int blockSize = blockInfoSize + 8; final int numBlocks = 3; - FileOutputStream fileOutputStream = new FileOutputStream(tempFile); - for (int i = 0; i < blockSize * numBlocks; i++) { - fileOutputStream.write(new byte[] {1}); - } - fileOutputStream.close(); + + // create temporary file with 3 blocks + final File tempFile = + createBinaryInputFile( + "test_create_input_splits_with_one_file", blockSize, numBlocks); final Configuration config = new Configuration(); config.setLong("input.block_size", blockSize + 10); @@ -169,11 +170,52 @@ public class BinaryInputFormatTest { .isEqualTo(blockSize * (numBlocks1 + numBlocks2)); } + @Test + public void testCreateInputSplitsWithEmptySplit() throws IOException { + final int blockInfoSize = new BlockInfo().getInfoSize(); + final int blockSize = blockInfoSize + 8; + final int numBlocks = 3; + final int minNumSplits = 5; + + // create temporary file with 3 blocks + final File tempFile = + createBinaryInputFile( + "test_create_input_splits_with_empty_split", blockSize, numBlocks); + + final Configuration config = new Configuration(); + config.setLong("input.block_size", blockSize + 10); + + final BinaryInputFormat<Record> inputFormat = new MyBinaryInputFormat(); + inputFormat.setFilePath(tempFile.toURI().toString()); + inputFormat.setBlockSize(blockSize); + + inputFormat.configure(config); + + FileInputSplit[] inputSplits = inputFormat.createInputSplits(minNumSplits); + + assertThat(inputSplits).as("Returns requested numbers of splits.").hasSize(minNumSplits); + + assertThat(inputSplits[0].getLength()) + .as("1. split should have block size length.") + .isEqualTo(blockSize); + + assertThat(inputSplits[1].getLength()) + .as("2. split should have block size length.") + .isEqualTo(blockSize); + + assertThat(inputSplits[2].getLength()) + .as("3. split should have block size length.") + .isEqualTo(blockSize); + + assertThat(inputSplits[3].getLength()).as("4. split should be an empty split.").isZero(); + + assertThat(inputSplits[4].getLength()).as("5. split should be an empty split.").isZero(); + } + /** Creates a temp file with a certain number of blocks of a certain size. */ private File createBinaryInputFile(String fileName, int blockSize, int numBlocks) throws IOException { - final File tempFile = File.createTempFile(fileName, "tmp"); - tempFile.deleteOnExit(); + final File tempFile = tempFolder.newFile(fileName); try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) { for (int i = 0; i < blockSize * numBlocks; i++) { fileOutputStream.write(new byte[] {1});
