asfgit closed pull request #6466: [FLINK-10005][DataStream API]
StreamingFileSink: sets initialPartCounter=maxUsed in new Buckets
URL: https://github.com/apache/flink/pull/6466
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index ec59233c0e5..a350096e38b 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -115,6 +115,18 @@ public Bucket(
this.pending = new ArrayList<>();
}
+ /**
+ * Gets the information available for the currently
+ * open part file, i.e. the one we are currently writing to.
+ *
+ * <p>This will be null if there is no currently open part file. This
+ * is the case when we have a new, just created bucket or a bucket
+ * that has not received any data after the closing of its previously
+ * open in-progress file due to the specified rolling policy.
+ *
+ * @return The information about the currently in-progress part file
+ * or {@code null} if there is no open part file.
+ */
public PartFileInfo<BucketID> getInProgressPartInfo() {
return currentPart;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 7e9dd61e035..e62c425fc2f 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -70,8 +70,6 @@
private final Map<BucketID, Bucket<IN, BucketID>> activeBuckets;
- private long initMaxPartCounter;
-
private long maxPartCounterUsed;
private final RecoverableWriter fileSystemWriter;
@@ -114,7 +112,6 @@
bucketer.getSerializer()
);
- this.initMaxPartCounter = 0L;
this.maxPartCounterUsed = 0L;
}
@@ -137,7 +134,7 @@ void initializeState(final ListState<byte[]> bucketStates,
final ListState<Long>
for (long partCounter: partCounterState.get()) {
maxCounter = Math.max(partCounter, maxCounter);
}
- initMaxPartCounter = maxCounter;
+ maxPartCounterUsed = maxCounter;
// get the restored buckets
for (byte[] recoveredState : bucketStates.get()) {
@@ -151,7 +148,7 @@ void initializeState(final ListState<byte[]> bucketStates,
final ListState<Long>
final Bucket<IN, BucketID> restoredBucket =
bucketFactory.restoreBucket(
fileSystemWriter,
subtaskIndex,
- initMaxPartCounter,
+ maxPartCounterUsed,
partFileWriterFactory,
bucketState
);
@@ -200,8 +197,6 @@ void snapshotState(
final PartFileInfo<BucketID> info =
bucket.getInProgressPartInfo();
if (info != null &&
rollingPolicy.shouldRollOnCheckpoint(info)) {
- // we also check here so that we do not have to
always
- // wait for the "next" element to arrive.
bucket.closePartFile();
}
@@ -237,13 +232,19 @@ void onElement(IN value, SinkFunction.Context context)
throws Exception {
subtaskIndex,
bucketId,
bucketPath,
- initMaxPartCounter,
+ maxPartCounterUsed,
partFileWriterFactory);
activeBuckets.put(bucketId, bucket);
}
final PartFileInfo<BucketID> info =
bucket.getInProgressPartInfo();
if (info == null || rollingPolicy.shouldRollOnEvent(info,
value)) {
+
+ // info will be null if there is no currently open part
file. This
+ // is the case when we have a new, just created bucket
or a bucket
+ // that has not received any data after the closing of
its previously
+ // open in-progress file due to the specified rolling
policy.
+
bucket.rollPartFile(currentProcessingTime);
}
bucket.write(value, currentProcessingTime);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index 4b1e7436772..a0c438e1847 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -323,7 +323,7 @@ public void testInactivityPeriodWithLateNotify() throws
Exception {
Assert.assertEquals("test1@1\n",
fileContents.getValue());
} else if
(fileContents.getKey().getParentFile().getName().equals("test2")) {
bucketCounter++;
- Assert.assertEquals("part-0-0",
fileContents.getKey().getName());
+ Assert.assertEquals("part-0-1",
fileContents.getKey().getName());
Assert.assertEquals("test2@1\n",
fileContents.getValue());
} else if
(fileContents.getKey().getParentFile().getName().equals("test3")) {
bucketCounter++;
@@ -346,11 +346,11 @@ public void testInactivityPeriodWithLateNotify() throws
Exception {
Assert.assertEquals("test2@1\n",
fileContents.getValue());
} else if
(fileContents.getKey().getParentFile().getName().equals("test3")) {
bucketCounter++;
- Assert.assertEquals("part-0-0",
fileContents.getKey().getName());
+ Assert.assertEquals("part-0-2",
fileContents.getKey().getName());
Assert.assertEquals("test3@1\n",
fileContents.getValue());
} else if
(fileContents.getKey().getParentFile().getName().equals("test4")) {
bucketCounter++;
- Assert.assertEquals("part-0-0",
fileContents.getKey().getName());
+ Assert.assertEquals("part-0-3",
fileContents.getKey().getName());
Assert.assertEquals("test4@1\n",
fileContents.getValue());
}
}
@@ -437,8 +437,8 @@ public void testScalingDownAndMergingOfStates() throws
Exception {
inProgressFilename.contains(".part-1-0.inprogress")
)
) {
- counter++;
- } else if (parentFilename.equals("test2") &&
inProgressFilename.contains(".part-1-0.inprogress")) {
+ counter++;
+ } else if (parentFilename.equals("test2") &&
inProgressFilename.contains(".part-1-1.inprogress")) {
counter++;
}
}
@@ -476,7 +476,7 @@ public void testScalingDownAndMergingOfStates() throws
Exception {
counter++;
Assert.assertTrue(fileContents.getValue().equals("test1@1\n") ||
fileContents.getValue().equals("test1@0\n"));
}
- } else if (parentFilename.equals("test2") &&
filename.contains(".part-1-0.inprogress")) {
+ } else if (parentFilename.equals("test2") &&
filename.contains(".part-1-1.inprogress")) {
counter++;
Assert.assertEquals("test2@1\n",
fileContents.getValue());
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index db54de941b9..f16a9085d9d 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -25,12 +25,19 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.util.Objects;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.either;
+import static org.hamcrest.CoreMatchers.equalTo;
/**
* Tests for different {@link RollingPolicy rolling policies}.
@@ -134,24 +141,74 @@ public void testRollOnCheckpointPolicy() throws Exception
{
// we take a checkpoint so we roll.
testHarness.snapshot(1L, 1L);
+ for (File file: FileUtils.listFiles(outDir, null,
true)) {
+ if
(Objects.equals(file.getParentFile().getName(), "test1")) {
+
Assert.assertTrue(file.getName().contains(".part-0-1.inprogress."));
+ } else if
(Objects.equals(file.getParentFile().getName(), "test2")) {
+
Assert.assertTrue(file.getName().contains(".part-0-0.inprogress."));
+ }
+ }
+
// this will create a new part file
testHarness.processElement(new
StreamRecord<>(Tuple2.of("test1", 4), 4L));
TestUtils.checkLocalFs(outDir, 3, 0);
+ testHarness.notifyOfCompletedCheckpoint(1L);
+ for (File file: FileUtils.listFiles(outDir, null,
true)) {
+ if
(Objects.equals(file.getParentFile().getName(), "test1")) {
+ Assert.assertTrue(
+
file.getName().contains(".part-0-2.inprogress.") ||
file.getName().equals("part-0-1")
+ );
+ } else if
(Objects.equals(file.getParentFile().getName(), "test2")) {
+ Assert.assertEquals("part-0-0",
file.getName());
+ }
+ }
+
// and open and fill .part-0-2.inprogress
testHarness.processElement(new
StreamRecord<>(Tuple2.of("test1", 5), 5L));
testHarness.processElement(new
StreamRecord<>(Tuple2.of("test1", 6), 6L));
- TestUtils.checkLocalFs(outDir, 3, 0);
// nothing committed yet
+ TestUtils.checkLocalFs(outDir, 1, 2);
// we take a checkpoint so we roll.
testHarness.snapshot(2L, 2L);
testHarness.processElement(new
StreamRecord<>(Tuple2.of("test2", 7), 7L));
- TestUtils.checkLocalFs(outDir, 4, 0);
+ TestUtils.checkLocalFs(outDir, 2, 2);
+
+ for (File file: FileUtils.listFiles(outDir, null,
true)) {
+ if
(Objects.equals(file.getParentFile().getName(), "test1")) {
+ Assert.assertThat(
+ file.getName(),
+
either(containsString(".part-0-2.inprogress."))
+
.or(equalTo("part-0-1"))
+ );
+ } else if
(Objects.equals(file.getParentFile().getName(), "test2")) {
+ Assert.assertThat(
+ file.getName(),
+
either(containsString(".part-0-3.inprogress."))
+
.or(equalTo("part-0-0"))
+ );
+ }
+ }
// we acknowledge the last checkpoint so we should
publish all but the latest in-progress file
testHarness.notifyOfCompletedCheckpoint(2L);
+
TestUtils.checkLocalFs(outDir, 1, 3);
+ for (File file: FileUtils.listFiles(outDir, null,
true)) {
+ if
(Objects.equals(file.getParentFile().getName(), "test1")) {
+ Assert.assertThat(
+ file.getName(),
+
either(equalTo("part-0-2")).or(equalTo("part-0-1"))
+ );
+ } else if
(Objects.equals(file.getParentFile().getName(), "test2")) {
+ Assert.assertThat(
+ file.getName(),
+
either(containsString(".part-0-3.inprogress."))
+
.or(equalTo("part-0-0"))
+ );
+ }
+ }
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services