[
https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974161#comment-15974161
]
ASF GitHub Bot commented on FLINK-4821:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3001#discussion_r112123126
--- Diff:
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
---
@@ -559,48 +699,298 @@ public void
testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoi
@Test
@SuppressWarnings("unchecked")
+ public void
testFetcherShouldBeCorrectlySeededIfRestoringFromLegacyCheckpoint() throws
Exception {
+ HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState =
getFakeRestoredStore("all");
+
+ KinesisDataFetcher mockedFetcher =
Mockito.mock(KinesisDataFetcher.class);
+ List<KinesisStreamShard> shards = new ArrayList<>();
+ shards.addAll(fakeRestoredState.keySet());
+
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
+
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
+
+ // assume the given config is correct
+ PowerMockito.mockStatic(KinesisConfigUtil.class);
+ PowerMockito.doNothing().when(KinesisConfigUtil.class);
+
+ TestableFlinkKinesisConsumer consumer = new
TestableFlinkKinesisConsumer(
+ "fakeStream", new Properties(), 10, 2);
+ consumer.restoreState(fakeRestoredState);
+ consumer.open(new Configuration());
+ consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+
+ Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
+ for (Map.Entry<KinesisStreamShard, SequenceNumber>
restoredShard : fakeRestoredState.entrySet()) {
+
Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream(
+ restoredShard.getKey().getStreamName(),
restoredShard.getKey().getShard().getShardId());
+
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
+ new
KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
public void
testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {
+ //
----------------------------------------------------------------------
+ // setting initial state
+ //
----------------------------------------------------------------------
+ HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState =
getFakeRestoredStore("all");
+
+ //
----------------------------------------------------------------------
+ // mock operator state backend and initial state for
initializeState()
+ //
----------------------------------------------------------------------
+ TestingListState<Serializable> listState = new
TestingListState<>();
+ for (Map.Entry<KinesisStreamShard, SequenceNumber> state:
fakeRestoredState.entrySet()) {
+ listState.add(Tuple2.of(state.getKey(),
state.getValue()));
+ }
+
+ OperatorStateStore operatorStateStore =
mock(OperatorStateStore.class);
+
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+
+ StateInitializationContext initializationContext =
mock(StateInitializationContext.class);
+
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+ when(initializationContext.isRestored()).thenReturn(true);
+
+ //
----------------------------------------------------------------------
+ // mock fetcher
+ //
----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher =
Mockito.mock(KinesisDataFetcher.class);
+ List<KinesisStreamShard> shards = new ArrayList<>();
+ shards.addAll(fakeRestoredState.keySet());
+
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
- HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState =
new HashMap<>();
- fakeRestoredState.put(
- new KinesisStreamShard("fakeStream1",
- new
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
- new SequenceNumber(UUID.randomUUID().toString()));
- fakeRestoredState.put(
- new KinesisStreamShard("fakeStream1",
- new
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
- new SequenceNumber(UUID.randomUUID().toString()));
- fakeRestoredState.put(
- new KinesisStreamShard("fakeStream1",
- new
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
- new SequenceNumber(UUID.randomUUID().toString()));
- fakeRestoredState.put(
- new KinesisStreamShard("fakeStream2",
- new
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
- new SequenceNumber(UUID.randomUUID().toString()));
- fakeRestoredState.put(
- new KinesisStreamShard("fakeStream2",
- new
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
- new SequenceNumber(UUID.randomUUID().toString()));
+ //
----------------------------------------------------------------------
+ // start to test seed initial state to fetcher
+ //
----------------------------------------------------------------------
+ TestableFlinkKinesisConsumer consumer = new
TestableFlinkKinesisConsumer(
+ "fakeStream", new Properties(), 10, 2);
+ consumer.initializeState(initializationContext);
+ consumer.open(new Configuration());
+ consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+
+ Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
+ for (Map.Entry<KinesisStreamShard, SequenceNumber>
restoredShard : fakeRestoredState.entrySet()) {
+
Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream(
+ restoredShard.getKey().getStreamName(),
restoredShard.getKey().getShard().getShardId());
+
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
+ new
KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws
Exception {
+ //
----------------------------------------------------------------------
+ // setting initial state
+ //
----------------------------------------------------------------------
+ HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState =
getFakeRestoredStore("fakeStream1");
+
+ HashMap<KinesisStreamShard, SequenceNumber>
fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2");
+
+ //
----------------------------------------------------------------------
+ // mock operator state backend and initial state for
initializeState()
+ //
----------------------------------------------------------------------
+ TestingListState<Serializable> listState = new
TestingListState<>();
+ for (Map.Entry<KinesisStreamShard, SequenceNumber> state:
fakeRestoredState.entrySet()) {
+ listState.add(Tuple2.of(state.getKey(),
state.getValue()));
+ }
+ for (Map.Entry<KinesisStreamShard, SequenceNumber> state:
fakeRestoredStateForOthers.entrySet()) {
+ listState.add(Tuple2.of(state.getKey(),
state.getValue()));
+ }
+ OperatorStateStore operatorStateStore =
mock(OperatorStateStore.class);
+
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+
+ StateInitializationContext initializationContext =
mock(StateInitializationContext.class);
+
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+ when(initializationContext.isRestored()).thenReturn(true);
+
+ //
----------------------------------------------------------------------
+ // mock fetcher
+ //
----------------------------------------------------------------------
+ KinesisDataFetcher mockedFetcher =
Mockito.mock(KinesisDataFetcher.class);
+ List<KinesisStreamShard> shards = new ArrayList<>();
+ shards.addAll(fakeRestoredState.keySet());
+
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
+
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
+
+ // assume the given config is correct
+ PowerMockito.mockStatic(KinesisConfigUtil.class);
+ PowerMockito.doNothing().when(KinesisConfigUtil.class);
+
+ //
----------------------------------------------------------------------
+ // start to test seed initial state to fetcher
+ //
----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new
TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
- consumer.restoreState(fakeRestoredState);
+ consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
+ for (Map.Entry<KinesisStreamShard, SequenceNumber>
restoredShard : fakeRestoredStateForOthers.entrySet()) {
+ // should never get restored state not belonging to
itself
+ Mockito.verify(mockedFetcher,
never()).advanceLastDiscoveredShardOfStream(
+ restoredShard.getKey().getStreamName(),
restoredShard.getKey().getShard().getShardId());
+ Mockito.verify(mockedFetcher,
never()).registerNewSubscribedShardState(
+ new
KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+ }
for (Map.Entry<KinesisStreamShard, SequenceNumber>
restoredShard : fakeRestoredState.entrySet()) {
+ // should get restored state belonging to itself
Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream(
restoredShard.getKey().getStreamName(),
restoredShard.getKey().getShard().getShardId());
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new
KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
}
}
+
+ @Test
+ @SuppressWarnings("unchecked")
--- End diff --
Should place these annotations after the comment block. I think that's the
usual convention.
> Implement rescalable non-partitioned state for Kinesis Connector
> ----------------------------------------------------------------
>
> Key: FLINK-4821
> URL: https://issues.apache.org/jira/browse/FLINK-4821
> Project: Flink
> Issue Type: New Feature
> Components: Kinesis Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Wei-Che Wei
>
> FLINK-4379 added the rescalable non-partitioned state feature, along with the
> implementation for the Kafka connector.
> The AWS Kinesis connector will benefit from the feature and should implement
> it too. This ticket tracks progress for this.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)