This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c9a706b8388b324a37da43298e37074d0a452a34
Author: zhouli <[email protected]>
AuthorDate: Tue Jun 14 15:40:46 2022 +0800

    [FLINK-28018][core] correct the start index for creating empty splits in 
BinaryInputFormat#createInputSplits
    
    This closes #19946.
---
 .../flink/api/common/io/BinaryInputFormat.java     |  2 +-
 .../flink/api/common/io/BinaryInputFormatTest.java | 62 ++++++++++++++++++----
 2 files changed, 53 insertions(+), 11 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index 93bf43d72c8..75f4bb2a7fa 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -147,7 +147,7 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T>
             FileStatus last = files.get(files.size() - 1);
             final BlockLocation[] blocks =
                     last.getPath().getFileSystem().getFileBlockLocations(last, 
0, last.getLen());
-            for (int index = files.size(); index < minNumSplits; index++) {
+            for (int index = inputSplits.size(); index < minNumSplits; 
index++) {
                 inputSplits.add(
                         new FileInputSplit(
                                 index, last.getPath(), last.getLen(), 0, 
blocks[0].getHosts()));
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
index 480995f1132..7de496606ae 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
@@ -24,7 +24,9 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.types.Record;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -35,6 +37,8 @@ import static org.assertj.core.api.Fail.fail;
 
 public class BinaryInputFormatTest {
 
+    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
     private static final class MyBinaryInputFormat extends 
BinaryInputFormat<Record> {
 
         private static final long serialVersionUID = 1L;
@@ -52,17 +56,14 @@ public class BinaryInputFormatTest {
 
     @Test
     public void testCreateInputSplitsWithOneFile() throws IOException {
-        // create temporary file with 3 blocks
-        final File tempFile = File.createTempFile("binary_input_format_test", 
"tmp");
-        tempFile.deleteOnExit();
         final int blockInfoSize = new BlockInfo().getInfoSize();
         final int blockSize = blockInfoSize + 8;
         final int numBlocks = 3;
-        FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
-        for (int i = 0; i < blockSize * numBlocks; i++) {
-            fileOutputStream.write(new byte[] {1});
-        }
-        fileOutputStream.close();
+
+        // create temporary file with 3 blocks
+        final File tempFile =
+                createBinaryInputFile(
+                        "test_create_input_splits_with_one_file", blockSize, 
numBlocks);
 
         final Configuration config = new Configuration();
         config.setLong("input.block_size", blockSize + 10);
@@ -169,11 +170,52 @@ public class BinaryInputFormatTest {
                 .isEqualTo(blockSize * (numBlocks1 + numBlocks2));
     }
 
+    @Test
+    public void testCreateInputSplitsWithEmptySplit() throws IOException {
+        final int blockInfoSize = new BlockInfo().getInfoSize();
+        final int blockSize = blockInfoSize + 8;
+        final int numBlocks = 3;
+        final int minNumSplits = 5;
+
+        // create temporary file with 3 blocks
+        final File tempFile =
+                createBinaryInputFile(
+                        "test_create_input_splits_with_empty_split", 
blockSize, numBlocks);
+
+        final Configuration config = new Configuration();
+        config.setLong("input.block_size", blockSize + 10);
+
+        final BinaryInputFormat<Record> inputFormat = new 
MyBinaryInputFormat();
+        inputFormat.setFilePath(tempFile.toURI().toString());
+        inputFormat.setBlockSize(blockSize);
+
+        inputFormat.configure(config);
+
+        FileInputSplit[] inputSplits = 
inputFormat.createInputSplits(minNumSplits);
+
+        assertThat(inputSplits).as("Returns requested numbers of 
splits.").hasSize(minNumSplits);
+
+        assertThat(inputSplits[0].getLength())
+                .as("1. split should have block size length.")
+                .isEqualTo(blockSize);
+
+        assertThat(inputSplits[1].getLength())
+                .as("2. split should have block size length.")
+                .isEqualTo(blockSize);
+
+        assertThat(inputSplits[2].getLength())
+                .as("3. split should have block size length.")
+                .isEqualTo(blockSize);
+
+        assertThat(inputSplits[3].getLength()).as("4. split should be an empty 
split.").isZero();
+
+        assertThat(inputSplits[4].getLength()).as("5. split should be an empty 
split.").isZero();
+    }
+
     /** Creates a temp file with a certain number of blocks of a certain size. 
*/
     private File createBinaryInputFile(String fileName, int blockSize, int 
numBlocks)
             throws IOException {
-        final File tempFile = File.createTempFile(fileName, "tmp");
-        tempFile.deleteOnExit();
+        final File tempFile = tempFolder.newFile(fileName);
         try (FileOutputStream fileOutputStream = new 
FileOutputStream(tempFile)) {
             for (int i = 0; i < blockSize * numBlocks; i++) {
                 fileOutputStream.write(new byte[] {1});

Reply via email to