[
https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643923#comment-15643923
]
ASF GitHub Bot commented on FLINK-5020:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2759#discussion_r86757277
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
---
@@ -158,17 +160,151 @@ public void testDataDiscardingUponRestore() throws
Exception {
testHarness = new OneInputStreamOperatorTestHarness<>(sink);
testHarness.setup();
- testHarness.restore(latestSnapshot);
+ testHarness.initializeState(latestSnapshot);
testHarness.open();
for (int x = 0; x < 20; x++) {
testHarness.processElement(new
StreamRecord<>(generateValue(elementCounter, 2)));
elementCounter++;
}
- testHarness.snapshotLegacy(snapshotCount++, 0);
+ testHarness.snapshot(snapshotCount++, 0);
testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
- verifyResultsDataDiscardingUponRestore(testHarness, sink);
+ verifyResultsDataDiscardingUponRestore(sink);
+ }
+
+ @Test
+ public void testScalingDown() throws Exception {
+ S sink1 = createSink();
+ OneInputStreamOperatorTestHarness<IN, IN> testHarness1 =
+ new OneInputStreamOperatorTestHarness<>(sink1, 10, 2,
0);
+ testHarness1.open();
+
+ S sink2 = createSink();
+ OneInputStreamOperatorTestHarness<IN, IN> testHarness2 =
+ new OneInputStreamOperatorTestHarness<>(sink2, 10, 2,
1);
+ testHarness2.open();
+
+ int elementCounter = 1;
--- End diff --
leave it as it is then
> Make the GenericWriteAheadSink rescalable.
> ------------------------------------------
>
> Key: FLINK-5020
> URL: https://issues.apache.org/jira/browse/FLINK-5020
> Project: Flink
> Issue Type: Improvement
> Components: Cassandra Connector
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> This targets integrating the GenericWriteAheadSink with the new rescalable
> state abstractions so that the parallelism of the operator can change
> arbitrarily without jeopardizing the guarantees offered by it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)