Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5337#discussion_r163176506 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java --- @@ -515,6 +515,56 @@ public void testStreamShardMetadataSerializedUsingPojoSerializer() { assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer); } + /** + * FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#shardId} or + * {@link StreamShardMetadata#streamName} does not result in the shard not being able to be restored. + * This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the + * job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on + * {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata. + */ + @Test + public void testFindSequenceNumberToRestoreFrom() { --- End diff -- Nice that a unit test is added for the method! However, I think it would be even better if we also include a test which uses the `AbstractStreamOperatorTestHarness` to test this restore. That utility class allows taking a snapshot of the wrapped operator's state, and restoring it again with the snapshot.
---