[
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643900#comment-15643900
]
ASF GitHub Bot commented on FLINK-5020:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2759#discussion_r86754225
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
---
@@ -158,17 +160,151 @@ public void testDataDiscardingUponRestore() throws
Exception {
testHarness = new OneInputStreamOperatorTestHarness<>(sink);
testHarness.setup();
- testHarness.restore(latestSnapshot);
+ testHarness.initializeState(latestSnapshot);
testHarness.open();
for (int x = 0; x < 20; x++) {
testHarness.processElement(new
StreamRecord<>(generateValue(elementCounter, 2)));
elementCounter++;
}
- testHarness.snapshotLegacy(snapshotCount++, 0);
+ testHarness.snapshot(snapshotCount++, 0);
testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
- verifyResultsDataDiscardingUponRestore(testHarness, sink);
+ verifyResultsDataDiscardingUponRestore(sink);
+ }
+
+ @Test
+ public void testScalingDown() throws Exception {
+ S sink1 = createSink();
+ OneInputStreamOperatorTestHarness<IN, IN> testHarness1 =
+ new OneInputStreamOperatorTestHarness<>(sink1, 10, 2,
0);
+ testHarness1.open();
+
+ S sink2 = createSink();
+ OneInputStreamOperatorTestHarness<IN, IN> testHarness2 =
+ new OneInputStreamOperatorTestHarness<>(sink2, 10, 2,
1);
+ testHarness2.open();
+
+ int elementCounter = 1;
+ int snapshotCount = 0;
+
+ for (int x = 0; x < 10; x++) {
+ testHarness1.processElement(new
StreamRecord<>(generateValue(elementCounter, 0)));
+ elementCounter++;
+ }
+
+ for (int x = 0; x < 11; x++) {
+ testHarness2.processElement(new
StreamRecord<>(generateValue(elementCounter, 0)));
+ elementCounter++;
+ }
+
+ // snapshot at checkpoint 0 for testHarness1 and testHarness 2
+ OperatorStateHandles snapshot1 =
testHarness1.snapshot(snapshotCount, 0);
+ OperatorStateHandles snapshot2 =
testHarness2.snapshot(snapshotCount, 0);
+
+ // merge the two partial states
+ OperatorStateHandles mergedSnapshot =
AbstractStreamOperatorTestHarness
+ .repackageState(snapshot1, snapshot2);
+
+ testHarness1.close();
+ testHarness2.close();
+
+ // and create a third instance that operates alone but
+ // has the merged state of the previous 2 instances
+
+ S sink3 = createSink();
+ OneInputStreamOperatorTestHarness<IN, IN> mergedTestHarness =
+ new OneInputStreamOperatorTestHarness<>(sink3, 10, 1,
0);
+
+ mergedTestHarness.setup();
+ mergedTestHarness.initializeState(mergedSnapshot);
+ mergedTestHarness.open();
+
+ for (int x = 0; x < 12; x++) {
+ mergedTestHarness.processElement(new
StreamRecord<>(generateValue(elementCounter, 0)));
+ elementCounter++;
+ }
+
+ snapshotCount++;
+ mergedTestHarness.snapshot(snapshotCount, 1);
+ mergedTestHarness.notifyOfCompletedCheckpoint(snapshotCount);
+
+ verifyResultsWhenScalingDown(sink3);
+ mergedTestHarness.close();
+ }
+
+ @Test
+ public void testScalingUp() throws Exception {
+
+ S sink1 = createSink();
+ OneInputStreamOperatorTestHarness<IN, IN> testHarness1 =
+ new OneInputStreamOperatorTestHarness<>(sink1, 10, 1,
0);
+
+ int elementCounter = 1;
+ int snapshotCount = 0;
+
+ testHarness1.open();
+
+ // put two more checkpoints as pending
+
+ for (int x = 0; x < 10; x++) {
+ testHarness1.processElement(new
StreamRecord<>(generateValue(elementCounter, 0)));
+ elementCounter++;
+ }
+ testHarness1.snapshot(++snapshotCount, 0);
+
+ for (int x = 0; x < 11; x++) {
+ testHarness1.processElement(new
StreamRecord<>(generateValue(elementCounter, 0)));
+ elementCounter++;
+ }
+
+ // this will be the state that will be split between the two
new operators
+ OperatorStateHandles snapshot =
testHarness1.snapshot(++snapshotCount, 0);
+
+ testHarness1.close();
+
+ // verify no elements are in the sink
+ verifyResultsWhenScalingUp(sink1, 0, -1);
--- End diff --
this isn't necessary
> Make the GenericWriteAheadSink rescalable.
> ------------------------------------------
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
> Issue Type: Improvement
> Components: Cassandra Connector
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable
> state abstractions so that the parallelism of the operator can change
> arbitrarily without jeopardizing the guarantees offered by it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)