[ 
https://issues.apache.org/jira/browse/FLINK-10785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16817209#comment-16817209
 ] 

vinoyang commented on FLINK-10785:
----------------------------------

When I update this migration test for Flink 1.8. I encounter the exception like 
before (my first comments). I fixed this problem with this code snippet in 
{{FlinkKinesisConsumerMigrationTest to generate snapshot file}}: 

 
{code:java}
private void writeSnapshot(String path, HashMap<StreamShardMetadata, 
SequenceNumber> state) throws Exception {
    final List<StreamShardHandle> initialDiscoveryShards = new 
ArrayList<>(state.size());
    for (StreamShardMetadata shardMetadata : state.keySet()) {
        Shard shard = new Shard();
        shard.setShardId(shardMetadata.getShardId());

        SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
        sequenceNumberRange.withStartingSequenceNumber("1");
        shard.setSequenceNumberRange(sequenceNumberRange);

        initialDiscoveryShards.add(new 
StreamShardHandle(shardMetadata.getStreamName(), shard));
    }

    final TestFetcher<String> fetcher = new TestFetcher<>(
        Collections.singletonList(TEST_STREAM_NAME),
        new TestSourceContext<>(),
        new TestRuntimeContext(true, 1, 0),
        TestUtils.getStandardProperties(),                                   
//one change point
        new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
        state,
        initialDiscoveryShards);                                             
//another change point
{code}
Currently, the code is:

 

 
{code:java}
private void writeSnapshot(String path, HashMap<StreamShardMetadata, 
SequenceNumber> state) throws Exception {
   final TestFetcher<String> fetcher = new TestFetcher<>(
      Collections.singletonList(TEST_STREAM_NAME),
      new TestSourceContext<>(),
      new TestRuntimeContext(true, 1, 0),
      new Properties(),
      new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
      state,
      null);
{code}
[~tzulitai] Do you think my fix is correct?

 

 

> Update FlinkKinesisConsumerMigrationTest for 1.7
> ------------------------------------------------
>
>                 Key: FLINK-10785
>                 URL: https://issues.apache.org/jira/browse/FLINK-10785
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Connectors / Kinesis, Tests
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Major
>             Fix For: 1.8.0
>
>
> Update {{FlinkKinesisConsumerMigrationTest}} so that it covers restoring from 
> 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to