Repository: flink Updated Branches: refs/heads/master 6642768ad -> c384e52e6
[FLINK-7429] [kinesis] Add IT tests for migration from 1.3 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c384e52e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c384e52e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c384e52e Branch: refs/heads/master Commit: c384e52e647da457ce5127863148d03c93c1a4aa Parents: 04add8d Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Authored: Fri Aug 18 11:27:38 2017 +0800 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Aug 25 11:13:34 2017 +0200 ---------------------------------------------------------------------- .../FlinkKinesisConsumerMigrationTest.java | 238 +++++++++++++++++++ ...sumer-migration-test-flink1.3-empty-snapshot | Bin 0 -> 13975 bytes ...is-consumer-migration-test-flink1.3-snapshot | Bin 0 -> 14043 bytes 3 files changed, 238 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c384e52e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java new file mode 100644 index 0000000..364560c --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.flink.streaming.util.migration.MigrationTestUtil; +import org.apache.flink.streaming.util.migration.MigrationVersion; + +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for checking whether {@link FlinkKinesisConsumer} can restore from snapshots that were + * done using an older {@code FlinkKinesisConsumer}. + * + * <p>For regenerating the binary snapshot files run {@link #writeSnapshot()} on the corresponding + * Flink release-* branch. + */ +@RunWith(Parameterized.class) +public class FlinkKinesisConsumerMigrationTest { + + /** + * TODO change this to the corresponding savepoint version to be written (e.g. {@link MigrationVersion#v1_3} for 1.3) + * TODO and remove all @Ignore annotations on the writeSnapshot() method to generate savepoints + */ + private final MigrationVersion flinkGenerateSavepointVersion = null; + + private static final HashMap<StreamShardMetadata, SequenceNumber> TEST_STATE = new HashMap<>(); + static { + StreamShardMetadata shardMetadata = new StreamShardMetadata(); + shardMetadata.setStreamName("fakeStream1"); + shardMetadata.setShardId(KinesisShardIdGenerator.generateFromShardOrder(0)); + + TEST_STATE.put(shardMetadata, new SequenceNumber("987654321")); + } + + private final MigrationVersion testMigrateVersion; + + @Parameterized.Parameters(name = "Migration Savepoint: {0}") + public static Collection<MigrationVersion> parameters () { + return Arrays.asList(MigrationVersion.v1_3); + } + + public FlinkKinesisConsumerMigrationTest(MigrationVersion testMigrateVersion) { + this.testMigrateVersion = testMigrateVersion; + } + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeSnapshot() throws Exception { + writeSnapshot("src/test/resources/kinesis-consumer-migration-test-flink" + flinkGenerateSavepointVersion + "-snapshot", TEST_STATE); + + // write empty state snapshot + writeSnapshot("src/test/resources/kinesis-consumer-migration-test-flink" + flinkGenerateSavepointVersion + "-empty-snapshot", new HashMap<>()); + } + + @Test + public void testRestoreWithEmptyState() throws Exception { + final DummyFlinkKinesisConsumer<String> consumerFunction = new DummyFlinkKinesisConsumer<>(mock(KinesisDataFetcher.class)); + + StreamSource<String, DummyFlinkKinesisConsumer<String>> consumerOperator = new StreamSource<>(consumerFunction); + + final AbstractStreamOperatorTestHarness<String> testHarness = + new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); + + testHarness.setup(); + MigrationTestUtil.restoreFromSnapshot( + testHarness, + "src/test/resources/kinesis-consumer-migration-test-flink" + testMigrateVersion + "-empty-snapshot", testMigrateVersion); + testHarness.open(); + + // assert that no state was restored + assertTrue(consumerFunction.getRestoredState().isEmpty()); + + consumerOperator.close(); + consumerOperator.cancel(); + } + + @Test + public void testRestore() throws Exception { + final DummyFlinkKinesisConsumer<String> consumerFunction = new DummyFlinkKinesisConsumer<>(mock(KinesisDataFetcher.class)); + + StreamSource<String, DummyFlinkKinesisConsumer<String>> consumerOperator = + new StreamSource<>(consumerFunction); + + final AbstractStreamOperatorTestHarness<String> testHarness = + new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); + + testHarness.setup(); + MigrationTestUtil.restoreFromSnapshot( + testHarness, + "src/test/resources/kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot", testMigrateVersion); + testHarness.open(); + + // assert that state is correctly restored + assertNotEquals(null, consumerFunction.getRestoredState()); + assertEquals(1, consumerFunction.getRestoredState().size()); + assertEquals(TEST_STATE, consumerFunction.getRestoredState()); + + consumerOperator.close(); + consumerOperator.cancel(); + } + + // ------------------------------------------------------------------------ + + @SuppressWarnings("unchecked") + private void writeSnapshot(String path, HashMap<StreamShardMetadata, SequenceNumber> state) throws Exception { + final OneShotLatch latch = new OneShotLatch(); + + final KinesisDataFetcher<String> fetcher = mock(KinesisDataFetcher.class); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + latch.trigger(); + return null; + } + }).when(fetcher).runFetcher(); + when(fetcher.snapshotState()).thenReturn(state); + + final DummyFlinkKinesisConsumer<String> consumer = new DummyFlinkKinesisConsumer<>(fetcher); + + StreamSource<String, DummyFlinkKinesisConsumer<String>> consumerOperator = new StreamSource<>(consumer); + + final AbstractStreamOperatorTestHarness<String> testHarness = + new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0); + + testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + testHarness.setup(); + testHarness.open(); + + final AtomicReference<Throwable> error = new AtomicReference<>(); + + // run the source asynchronously + Thread runner = new Thread() { + @Override + public void run() { + try { + consumer.run(mock(SourceFunction.SourceContext.class)); + } catch (Throwable t) { + t.printStackTrace(); + error.set(t); + } + } + }; + runner.start(); + + if (!latch.isTriggered()) { + latch.await(); + } + + final OperatorStateHandles snapshot; + synchronized (testHarness.getCheckpointLock()) { + snapshot = testHarness.snapshot(0L, 0L); + } + + OperatorSnapshotUtil.writeStateHandle(snapshot, path); + + consumerOperator.close(); + runner.join(); + } + + private static class DummyFlinkKinesisConsumer<T> extends FlinkKinesisConsumer<T> { + + private KinesisDataFetcher<T> mockFetcher; + + private static Properties dummyConfig = new Properties(); + static { + dummyConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + dummyConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + dummyConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + } + + DummyFlinkKinesisConsumer(KinesisDataFetcher<T> mockFetcher) { + super("dummy-topic", mock(KinesisDeserializationSchema.class), dummyConfig); + this.mockFetcher = mockFetcher; + } + + @Override + protected KinesisDataFetcher<T> createFetcher( + List<String> streams, + SourceContext<T> sourceContext, + RuntimeContext runtimeContext, + Properties configProps, + KinesisDeserializationSchema<T> deserializer) { + return mockFetcher; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c384e52e/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.3-empty-snapshot ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.3-empty-snapshot b/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.3-empty-snapshot new file mode 100644 index 0000000..aa981c0 Binary files /dev/null and b/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.3-empty-snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/c384e52e/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.3-snapshot ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.3-snapshot b/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.3-snapshot new file mode 100644 index 0000000..ddf8a4d Binary files /dev/null and b/flink-connectors/flink-connector-kinesis/src/test/resources/kinesis-consumer-migration-test-flink1.3-snapshot differ