[
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15668574#comment-15668574
]
ASF GitHub Bot commented on FLINK-5056:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2797#discussion_r88124075
--- Diff:
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
---
@@ -118,48 +150,290 @@ public static void destroyHDFS() {
}
@Test
- public void testCheckpointWithoutNotify() throws Exception {
- File dataDir = tempFolder.newFolder();
+ public void testInactivityPeriodWithLateNotify() throws Exception {
+ final File outDir = tempFolder.newFolder();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
createRescalingTestSink(outDir, 1, 0, 100);
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(0L);
+
+ testHarness.processElement(new StreamRecord<>("test1", 1L));
+ testHarness.processElement(new StreamRecord<>("test2", 1L));
+ checkFs(outDir, 2, 0 ,0, 0);
+
+ testHarness.setProcessingTime(101L); // put some in pending
+ checkFs(outDir, 0, 2, 0, 0);
+
+ testHarness.snapshot(0, 0); // put
them in pending for 0
+ checkFs(outDir, 0, 2, 0, 0);
- OneInputStreamOperatorTestHarness<String, Object> testHarness =
createTestSink(dataDir);
+ testHarness.processElement(new StreamRecord<>("test3", 1L));
+ testHarness.processElement(new StreamRecord<>("test4", 1L));
+ testHarness.setProcessingTime(202L); // put some in pending
+
+ testHarness.snapshot(1, 0); // put
them in pending for 1
+ checkFs(outDir, 0, 4, 0, 0);
+
+ testHarness.notifyOfCompletedCheckpoint(0); // put the
pending for 0 to the "committed" state
+ checkFs(outDir, 0, 2, 2, 0);
+
+ testHarness.notifyOfCompletedCheckpoint(1); // put the pending
for 1 to the "committed" state
+ checkFs(outDir, 0, 0, 4, 0);
+ }
+
+ @Test
+ public void testBucketStateTransitions() throws Exception {
+ final File outDir = tempFolder.newFolder();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
createRescalingTestSink(outDir, 1, 0, 100);
testHarness.setup();
testHarness.open();
- testHarness.processElement(new StreamRecord<>("Hello"));
- testHarness.processElement(new StreamRecord<>("Hello"));
- testHarness.processElement(new StreamRecord<>("Hello"));
+ testHarness.setProcessingTime(0L);
+
+ testHarness.processElement(new StreamRecord<>("test1", 1L));
+ testHarness.processElement(new StreamRecord<>("test2", 1L));
+ checkFs(outDir, 2, 0 ,0, 0);
+
+ // this is to check the inactivity threshold
+ testHarness.setProcessingTime(101L);
+ checkFs(outDir, 0, 2, 0, 0);
+
+ testHarness.processElement(new StreamRecord<>("test3", 1L));
+ checkFs(outDir, 1, 2, 0, 0);
+
+ testHarness.snapshot(0, 0);
+ checkFs(outDir, 1, 2, 0, 0);
- testHarness.setProcessingTime(10000L);
+ testHarness.notifyOfCompletedCheckpoint(0);
+ checkFs(outDir, 1, 0, 2, 0);
- // snapshot but don't call notify to simulate a notify that
never
- // arrives, the sink should move pending files in restore() in
that case
- StreamStateHandle snapshot1 = testHarness.snapshotLegacy(0, 0);
+ OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
- testHarness = createTestSink(dataDir);
+ testHarness.close();
+ checkFs(outDir, 0, 1, 2, 0);
+
+ testHarness = createRescalingTestSink(outDir, 1, 0, 100);
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+ checkFs(outDir, 0, 0, 3, 1);
+
+ snapshot = testHarness.snapshot(2, 0);
+
+ testHarness.processElement(new StreamRecord<>("test4", 10));
+ checkFs(outDir, 1, 0, 3, 1);
+
+ testHarness = createRescalingTestSink(outDir, 1, 0, 100);
testHarness.setup();
- testHarness.restore(snapshot1);
+ testHarness.initializeState(snapshot);
testHarness.open();
- testHarness.processElement(new StreamRecord<>("Hello"));
+ // the in-progress file remains as we do not clean up now
+ checkFs(outDir, 1, 0, 3, 1);
testHarness.close();
- int numComplete = 0;
- int numPending = 0;
- for (File file: FileUtils.listFiles(dataDir, null, true)) {
+ // at close it is not moved to final because it is not part
+ // of the current task's state, it was just a not cleaned up
leftover.
+ checkFs(outDir, 1, 0, 3, 1);
+ }
+
+ @Test
+ public void testSameParallelismWithShufflingStates() throws Exception {
+ final File outDir = tempFolder.newFolder();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness1
= createRescalingTestSink(outDir, 2, 0, 100);
+ testHarness1.setup();
+ testHarness1.open();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness2
= createRescalingTestSink(outDir, 2, 1, 100);
+ testHarness2.setup();
+ testHarness2.open();
+
+ testHarness1.processElement(new StreamRecord<>("test1", 0L));
+ checkFs(outDir, 1, 0, 0, 0);
+
+ testHarness2.processElement(new StreamRecord<>("test2", 0L));
+ checkFs(outDir, 2, 0, 0, 0);
+
+ // intentionally we snapshot them in the reverse order so that
the states are shuffled
+ OperatorStateHandles mergedSnapshot =
AbstractStreamOperatorTestHarness.repackageState(
+ testHarness2.snapshot(0, 0),
+ testHarness1.snapshot(0, 0)
+ );
+
+ checkFs(outDir, 2, 0, 0, 0);
+
+ // this will not be included in any checkpoint so it can be
cleaned up (although we do not)
+ testHarness2.processElement(new StreamRecord<>("test3", 0L));
+ checkFs(outDir, 3, 0, 0, 0);
+
+ testHarness1 = createRescalingTestSink(outDir, 2, 0, 100);
+ testHarness1.setup();
+ testHarness1.initializeState(mergedSnapshot);
+ testHarness1.open();
+
+ // the one in-progress will be the one assigned to the next
instance,
+ // the other is the test3 which is just not cleaned up
+ checkFs(outDir, 2, 0, 1, 1);
+
+ testHarness2 = createRescalingTestSink(outDir, 2, 1, 100);
+ testHarness2.setup();
+ testHarness2.initializeState(mergedSnapshot);
+ testHarness2.open();
+
+ checkFs(outDir, 1, 0, 2, 2);
+
+ testHarness1.close();
+ testHarness2.close();
+
+ // the 1 in-progress can be discarded.
+ checkFs(outDir, 1, 0, 2, 2);
+ }
+
+ @Test
+ public void testScalingDown() throws Exception {
+ final File outDir = tempFolder.newFolder();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness1
= createRescalingTestSink(outDir, 3, 0, 100);
+ testHarness1.setup();
+ testHarness1.open();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness2
= createRescalingTestSink(outDir, 3, 1, 100);
+ testHarness2.setup();
+ testHarness2.open();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness3
= createRescalingTestSink(outDir, 3, 2, 100);
+ testHarness3.setup();
+ testHarness3.open();
+
+ testHarness1.processElement(new StreamRecord<>("test1", 0L));
+ checkFs(outDir, 1, 0, 0, 0);
+
+ testHarness2.processElement(new StreamRecord<>("test2", 0L));
+ checkFs(outDir, 2, 0, 0, 0);
+
+ testHarness3.processElement(new StreamRecord<>("test3", 0L));
+ testHarness3.processElement(new StreamRecord<>("test4", 0L));
+ checkFs(outDir, 4, 0, 0, 0);
+
+ // intentionally we snapshot them in the reverse order so that
the states are shuffled
+ OperatorStateHandles mergedSnapshot =
AbstractStreamOperatorTestHarness.repackageState(
+ testHarness3.snapshot(0, 0),
+ testHarness1.snapshot(0, 0),
+ testHarness2.snapshot(0, 0)
+ );
+
+ testHarness1 = createRescalingTestSink(outDir, 2, 0, 100);
+ testHarness1.setup();
+ testHarness1.initializeState(mergedSnapshot);
+ testHarness1.open();
+
+ checkFs(outDir, 1, 0, 3, 3);
+
+ testHarness2 = createRescalingTestSink(outDir, 2, 1, 100);
+ testHarness2.setup();
+ testHarness2.initializeState(mergedSnapshot);
+ testHarness2.open();
+
+ checkFs(outDir, 0, 0, 4, 4);
+ }
+
+ @Test
+ public void testScalingUp() throws Exception {
+ final File outDir = tempFolder.newFolder();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness1
= createRescalingTestSink(outDir, 2, 0, 100);
+ testHarness1.setup();
+ testHarness1.open();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness2
= createRescalingTestSink(outDir, 2, 0, 100);
+ testHarness2.setup();
+ testHarness2.open();
+
+ testHarness1.processElement(new StreamRecord<>("test1", 1L));
+ testHarness1.processElement(new StreamRecord<>("test2", 1L));
+
+ checkFs(outDir, 2, 0, 0, 0);
+
+ testHarness2.processElement(new StreamRecord<>("test3", 1L));
+ testHarness2.processElement(new StreamRecord<>("test4", 1L));
+ testHarness2.processElement(new StreamRecord<>("test5", 1L));
+
+ checkFs(outDir, 5, 0, 0, 0);
+
+ // intentionally we snapshot them in the reverse order so that
the states are shuffled
+ OperatorStateHandles mergedSnapshot =
AbstractStreamOperatorTestHarness.repackageState(
+ testHarness2.snapshot(0, 0),
+ testHarness1.snapshot(0, 0)
+ );
+
+ testHarness1 = createRescalingTestSink(outDir, 3, 0, 100);
+ testHarness1.setup();
+ testHarness1.initializeState(mergedSnapshot);
+ testHarness1.open();
+
+ checkFs(outDir, 2, 0, 3, 3);
+
+ testHarness2 = createRescalingTestSink(outDir, 3, 1, 100);
+ testHarness2.setup();
+ testHarness2.initializeState(mergedSnapshot);
+ testHarness2.open();
+
+ checkFs(outDir, 0, 0, 5, 5);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness3
= createRescalingTestSink(outDir, 3, 2, 100);
+ testHarness3.setup();
+ testHarness3.initializeState(mergedSnapshot);
+ testHarness3.open();
+
--- End diff --
let's add a separate check here to make sure that nothing was changed just
by the initialization.
> BucketingSink deletes valid data when checkpoint notification is slow.
> ----------------------------------------------------------------------
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
> Issue Type: Bug
> Components: filesystem-connector
> Affects Versions: 1.1.3
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a
> notification about a previous checkpoint arrives, it clears its state. This
> can
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the
> problem:
> -> input data
> -> snapshot(0)
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the
> data
> that arrived for checkpoint 1.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)