[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15695560#comment-15695560 ]
ASF GitHub Bot commented on FLINK-5096: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2845#discussion_r89597858 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java --- @@ -638,6 +644,239 @@ public void flatMap(Tuple2<Integer, String> value, Assert.assertEquals(8, numFiles); } + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Test + public void testBucketStateTransitions() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + // we have a bucket size of 5 bytes, so each record will get its own bucket, + // i.e. the bucket should roll after every record. + + testHarness.processElement(new StreamRecord<>("test1", 1L)); + testHarness.processElement(new StreamRecord<>("test2", 1L)); + checkFs(outDir, 1, 1 ,0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 1L)); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.snapshot(0, 0); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.notifyOfCompletedCheckpoint(0); + checkFs(outDir, 1, 0, 2, 0); + + OperatorStateHandles snapshot = testHarness.snapshot(1, 0); + + testHarness.close(); + checkFs(outDir, 0, 1, 2, 0); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + checkFs(outDir, 0, 0, 3, 1); + + snapshot = testHarness.snapshot(2, 0); + + testHarness.processElement(new StreamRecord<>("test4", 10)); + checkFs(outDir, 1, 0, 3, 1); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + // the in-progress file remains as we do not clean up now + checkFs(outDir, 1, 0, 3, 1); + + testHarness.close(); + + // at close it is not moved to final because it is not part + // of the current task's state, it was just a not cleaned up leftover. + checkFs(outDir, 1, 0, 3, 1); + } + + @Test + public void testScalingDown() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 3, 0); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 3, 1); + testHarness2.setup(); + testHarness2.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2); + testHarness3.setup(); + testHarness3.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + checkFs(outDir, 1, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test2", 0L)); + checkFs(outDir, 2, 0, 0, 0); + + testHarness3.processElement(new StreamRecord<>("test3", 0L)); + testHarness3.processElement(new StreamRecord<>("test4", 0L)); + checkFs(outDir, 3, 1, 0, 0); + + // intentionally we snapshot them in a not ascending order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness3.snapshot(0, 0), + testHarness1.snapshot(0, 0), + testHarness2.snapshot(0, 0) + ); + + //with the above state reshuffling, we expect the new testHarness1 + // to take the state of the previous testHarness3 and testHarness2 --- End diff -- doesn't harness1 receive the state of 3 and 1? What do you think about having the testHarness2 process 5 elements? This way one could always accurately deduce which of the subsequent harnesses got which state. > Make the RollingSink rescalable. > -------------------------------- > > Key: FLINK-5096 > URL: https://issues.apache.org/jira/browse/FLINK-5096 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Integrate the RollingSink with the new state abstractions so that its > parallelism can change after restoring from a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)