[
https://issues.apache.org/jira/browse/FLINK-10785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16817209#comment-16817209
]
vinoyang commented on FLINK-10785:
----------------------------------
When I update this migration test for Flink 1.8. I encounter the exception like
before (my first comments). I fixed this problem with this code snippet in
{{FlinkKinesisConsumerMigrationTest to generate snapshot file}}:
{code:java}
private void writeSnapshot(String path, HashMap<StreamShardMetadata,
SequenceNumber> state) throws Exception {
final List<StreamShardHandle> initialDiscoveryShards = new
ArrayList<>(state.size());
for (StreamShardMetadata shardMetadata : state.keySet()) {
Shard shard = new Shard();
shard.setShardId(shardMetadata.getShardId());
SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
sequenceNumberRange.withStartingSequenceNumber("1");
shard.setSequenceNumberRange(sequenceNumberRange);
initialDiscoveryShards.add(new
StreamShardHandle(shardMetadata.getStreamName(), shard));
}
final TestFetcher<String> fetcher = new TestFetcher<>(
Collections.singletonList(TEST_STREAM_NAME),
new TestSourceContext<>(),
new TestRuntimeContext(true, 1, 0),
TestUtils.getStandardProperties(),
//one change point
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
state,
initialDiscoveryShards);
//another change point
{code}
Currently, the code is:
{code:java}
private void writeSnapshot(String path, HashMap<StreamShardMetadata,
SequenceNumber> state) throws Exception {
final TestFetcher<String> fetcher = new TestFetcher<>(
Collections.singletonList(TEST_STREAM_NAME),
new TestSourceContext<>(),
new TestRuntimeContext(true, 1, 0),
new Properties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
state,
null);
{code}
[~tzulitai] Do you think my fix is correct?
> Update FlinkKinesisConsumerMigrationTest for 1.7
> ------------------------------------------------
>
> Key: FLINK-10785
> URL: https://issues.apache.org/jira/browse/FLINK-10785
> Project: Flink
> Issue Type: Sub-task
> Components: Connectors / Kinesis, Tests
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Major
> Fix For: 1.8.0
>
>
> Update {{FlinkKinesisConsumerMigrationTest}} so that it covers restoring from
> 1.7.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)