asfgit closed pull request #6466: [FLINK-10005][DataStream API] 
StreamingFileSink: sets initialPartCounter=maxUsed in new Buckets
URL: https://github.com/apache/flink/pull/6466
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index ec59233c0e5..a350096e38b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -115,6 +115,18 @@ public Bucket(
                this.pending = new ArrayList<>();
        }
 
+       /**
+        * Gets the information available for the currently
+        * open part file, i.e. the one we are currently writing to.
+        *
+        * <p>This will be null if there is no currently open part file. This
+        * is the case when we have a new, just created bucket or a bucket
+        * that has not received any data after the closing of its previously
+        * open in-progress file due to the specified rolling policy.
+        *
+        * @return The information about the currently in-progress part file
+        * or {@code null} if there is no open part file.
+        */
        public PartFileInfo<BucketID> getInProgressPartInfo() {
                return currentPart;
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 7e9dd61e035..e62c425fc2f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -70,8 +70,6 @@
 
        private final Map<BucketID, Bucket<IN, BucketID>> activeBuckets;
 
-       private long initMaxPartCounter;
-
        private long maxPartCounterUsed;
 
        private final RecoverableWriter fileSystemWriter;
@@ -114,7 +112,6 @@
                                bucketer.getSerializer()
                );
 
-               this.initMaxPartCounter = 0L;
                this.maxPartCounterUsed = 0L;
        }
 
@@ -137,7 +134,7 @@ void initializeState(final ListState<byte[]> bucketStates, 
final ListState<Long>
                for (long partCounter: partCounterState.get()) {
                        maxCounter = Math.max(partCounter, maxCounter);
                }
-               initMaxPartCounter = maxCounter;
+               maxPartCounterUsed = maxCounter;
 
                // get the restored buckets
                for (byte[] recoveredState : bucketStates.get()) {
@@ -151,7 +148,7 @@ void initializeState(final ListState<byte[]> bucketStates, 
final ListState<Long>
                        final Bucket<IN, BucketID> restoredBucket = 
bucketFactory.restoreBucket(
                                        fileSystemWriter,
                                        subtaskIndex,
-                                       initMaxPartCounter,
+                                       maxPartCounterUsed,
                                        partFileWriterFactory,
                                        bucketState
                        );
@@ -200,8 +197,6 @@ void snapshotState(
                        final PartFileInfo<BucketID> info = 
bucket.getInProgressPartInfo();
 
                        if (info != null && 
rollingPolicy.shouldRollOnCheckpoint(info)) {
-                               // we also check here so that we do not have to 
always
-                               // wait for the "next" element to arrive.
                                bucket.closePartFile();
                        }
 
@@ -237,13 +232,19 @@ void onElement(IN value, SinkFunction.Context context) 
throws Exception {
                                        subtaskIndex,
                                        bucketId,
                                        bucketPath,
-                                       initMaxPartCounter,
+                                       maxPartCounterUsed,
                                        partFileWriterFactory);
                        activeBuckets.put(bucketId, bucket);
                }
 
                final PartFileInfo<BucketID> info = 
bucket.getInProgressPartInfo();
                if (info == null || rollingPolicy.shouldRollOnEvent(info, 
value)) {
+
+                       // info will be null if there is no currently open part 
file. This
+                       // is the case when we have a new, just created bucket 
or a bucket
+                       // that has not received any data after the closing of 
its previously
+                       // open in-progress file due to the specified rolling 
policy.
+
                        bucket.rollPartFile(currentProcessingTime);
                }
                bucket.write(value, currentProcessingTime);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index 4b1e7436772..a0c438e1847 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -323,7 +323,7 @@ public void testInactivityPeriodWithLateNotify() throws 
Exception {
                                        Assert.assertEquals("test1@1\n", 
fileContents.getValue());
                                } else if 
(fileContents.getKey().getParentFile().getName().equals("test2")) {
                                        bucketCounter++;
-                                       Assert.assertEquals("part-0-0", 
fileContents.getKey().getName());
+                                       Assert.assertEquals("part-0-1", 
fileContents.getKey().getName());
                                        Assert.assertEquals("test2@1\n", 
fileContents.getValue());
                                } else if 
(fileContents.getKey().getParentFile().getName().equals("test3")) {
                                        bucketCounter++;
@@ -346,11 +346,11 @@ public void testInactivityPeriodWithLateNotify() throws 
Exception {
                                        Assert.assertEquals("test2@1\n", 
fileContents.getValue());
                                } else if 
(fileContents.getKey().getParentFile().getName().equals("test3")) {
                                        bucketCounter++;
-                                       Assert.assertEquals("part-0-0", 
fileContents.getKey().getName());
+                                       Assert.assertEquals("part-0-2", 
fileContents.getKey().getName());
                                        Assert.assertEquals("test3@1\n", 
fileContents.getValue());
                                } else if 
(fileContents.getKey().getParentFile().getName().equals("test4")) {
                                        bucketCounter++;
-                                       Assert.assertEquals("part-0-0", 
fileContents.getKey().getName());
+                                       Assert.assertEquals("part-0-3", 
fileContents.getKey().getName());
                                        Assert.assertEquals("test4@1\n", 
fileContents.getValue());
                                }
                        }
@@ -437,8 +437,8 @@ public void testScalingDownAndMergingOfStates() throws 
Exception {
                                                        
inProgressFilename.contains(".part-1-0.inprogress")
                                                )
                                ) {
-                                               counter++;
-                               } else if (parentFilename.equals("test2") && 
inProgressFilename.contains(".part-1-0.inprogress")) {
+                                       counter++;
+                               } else if (parentFilename.equals("test2") && 
inProgressFilename.contains(".part-1-1.inprogress")) {
                                        counter++;
                                }
                        }
@@ -476,7 +476,7 @@ public void testScalingDownAndMergingOfStates() throws 
Exception {
                                                counter++;
                                                
Assert.assertTrue(fileContents.getValue().equals("test1@1\n") || 
fileContents.getValue().equals("test1@0\n"));
                                        }
-                               } else if (parentFilename.equals("test2") && 
filename.contains(".part-1-0.inprogress")) {
+                               } else if (parentFilename.equals("test2") && 
filename.contains(".part-1-1.inprogress")) {
                                        counter++;
                                        Assert.assertEquals("test2@1\n", 
fileContents.getValue());
                                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index db54de941b9..f16a9085d9d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -25,12 +25,19 @@
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Objects;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.either;
+import static org.hamcrest.CoreMatchers.equalTo;
 
 /**
  * Tests for different {@link RollingPolicy rolling policies}.
@@ -134,24 +141,74 @@ public void testRollOnCheckpointPolicy() throws Exception 
{
                        // we take a checkpoint so we roll.
                        testHarness.snapshot(1L, 1L);
 
+                       for (File file: FileUtils.listFiles(outDir, null, 
true)) {
+                               if 
(Objects.equals(file.getParentFile().getName(), "test1")) {
+                                       
Assert.assertTrue(file.getName().contains(".part-0-1.inprogress."));
+                               } else if 
(Objects.equals(file.getParentFile().getName(), "test2")) {
+                                       
Assert.assertTrue(file.getName().contains(".part-0-0.inprogress."));
+                               }
+                       }
+
                        // this will create a new part file
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 4), 4L));
                        TestUtils.checkLocalFs(outDir, 3, 0);
 
+                       testHarness.notifyOfCompletedCheckpoint(1L);
+                       for (File file: FileUtils.listFiles(outDir, null, 
true)) {
+                               if 
(Objects.equals(file.getParentFile().getName(), "test1")) {
+                                       Assert.assertTrue(
+                                                       
file.getName().contains(".part-0-2.inprogress.") || 
file.getName().equals("part-0-1")
+                                       );
+                               } else if 
(Objects.equals(file.getParentFile().getName(), "test2")) {
+                                       Assert.assertEquals("part-0-0", 
file.getName());
+                               }
+                       }
+
                        // and open and fill .part-0-2.inprogress
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 5), 5L));
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 6), 6L));
-                       TestUtils.checkLocalFs(outDir, 3, 0);                   
 // nothing committed yet
+                       TestUtils.checkLocalFs(outDir, 1, 2);
 
                        // we take a checkpoint so we roll.
                        testHarness.snapshot(2L, 2L);
 
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test2", 7), 7L));
-                       TestUtils.checkLocalFs(outDir, 4, 0);
+                       TestUtils.checkLocalFs(outDir, 2, 2);
+
+                       for (File file: FileUtils.listFiles(outDir, null, 
true)) {
+                               if 
(Objects.equals(file.getParentFile().getName(), "test1")) {
+                                       Assert.assertThat(
+                                                       file.getName(),
+                                                       
either(containsString(".part-0-2.inprogress."))
+                                                                       
.or(equalTo("part-0-1"))
+                                       );
+                               } else if 
(Objects.equals(file.getParentFile().getName(), "test2")) {
+                                       Assert.assertThat(
+                                                       file.getName(),
+                                                       
either(containsString(".part-0-3.inprogress."))
+                                                                       
.or(equalTo("part-0-0"))
+                                       );
+                               }
+                       }
 
                        // we acknowledge the last checkpoint so we should 
publish all but the latest in-progress file
                        testHarness.notifyOfCompletedCheckpoint(2L);
+
                        TestUtils.checkLocalFs(outDir, 1, 3);
+                       for (File file: FileUtils.listFiles(outDir, null, 
true)) {
+                               if 
(Objects.equals(file.getParentFile().getName(), "test1")) {
+                                       Assert.assertThat(
+                                                       file.getName(),
+                                                       
either(equalTo("part-0-2")).or(equalTo("part-0-1"))
+                                       );
+                               } else if 
(Objects.equals(file.getParentFile().getName(), "test2")) {
+                                       Assert.assertThat(
+                                                       file.getName(),
+                                                       
either(containsString(".part-0-3.inprogress."))
+                                                                       
.or(equalTo("part-0-0"))
+                                       );
+                               }
+                       }
                }
        }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to