http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java new file mode 100644 index 0000000..dbf95f9 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; +import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import static org.junit.Assert.assertTrue; + +/** + * Suite of FlinkKinesisConsumer tests for the methods called throughout the source life cycle. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class}) +public class FlinkKinesisConsumerTest { + + @Rule + private ExpectedException exception = ExpectedException.none(); + + // ---------------------------------------------------------------------- + // FlinkKinesisConsumer.validateAwsConfiguration() tests + // ---------------------------------------------------------------------- + + @Test + public void testMissingAwsRegionInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config."); + + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + KinesisConfigUtil.validateAwsConfiguration(testConfig); + } + + @Test + public void testUnrecognizableAwsRegionInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid AWS region"); + + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "wrongRegionId"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + KinesisConfigUtil.validateAwsConfiguration(testConfig); + } + + @Test + public void testCredentialProviderTypeDefaultToBasicButNoCredentialsSetInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " + + "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); + + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + + KinesisConfigUtil.validateAwsConfiguration(testConfig); + } + + @Test + public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " + + "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); + + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + + KinesisConfigUtil.validateAwsConfiguration(testConfig); + } + + @Test + public void testUnrecognizableCredentialProviderTypeInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid AWS Credential Provider Type"); + + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + KinesisConfigUtil.validateAwsConfiguration(testConfig); + } + + // ---------------------------------------------------------------------- + // FlinkKinesisConsumer.validateConsumerConfiguration() tests + // ---------------------------------------------------------------------- + + @Test + public void testUnrecognizableStreamInitPositionTypeInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid initial position in stream"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableIntForGetRecordsRetriesInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableIntForGetRecordsMaxCountInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for maximum records per getRecords shard operation"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableLongForGetRecordsBackoffBaseMillisInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for get records operation base backoff milliseconds"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableLongForGetRecordsBackoffMaxMillisInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for get records operation max backoff milliseconds"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableDoubleForGetRecordsBackoffExponentialConstantInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for get records operation backoff exponential constant"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableLongForGetRecordsIntervalMillisInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableIntForGetShardIteratorRetriesInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableLongForGetShardIteratorBackoffBaseMillisInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableLongForGetShardIteratorBackoffMaxMillisInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableDoubleForGetShardIteratorBackoffExponentialConstantInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableLongForShardDiscoveryIntervalMillisInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + // ---------------------------------------------------------------------- + // FlinkKinesisConsumer.validateProducerConfiguration() tests + // ---------------------------------------------------------------------- + + @Test + public void testUnparsableLongForCollectionMaxCountInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for maximum number of items to pack into a PutRecords request"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "unparsableLong"); + + KinesisConfigUtil.validateProducerConfiguration(testConfig); + } + + @Test + public void testUnparsableLongForAggregationMaxCountInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for maximum number of items to pack into an aggregated record"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "unparsableLong"); + + KinesisConfigUtil.validateProducerConfiguration(testConfig); + } + + // ---------------------------------------------------------------------- + // Tests related to state initialization + // ---------------------------------------------------------------------- + + @Test + public void testSnapshotStateShouldBeNullIfSourceNotOpened() throws Exception { + Properties config = new Properties(); + config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); + + assertTrue(consumer.snapshotState(123, 123) == null); //arbitrary checkpoint id and timestamp + } + + @Test + public void testSnapshotStateShouldBeNullIfSourceNotRun() throws Exception { + Properties config = new Properties(); + config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); + consumer.open(new Configuration()); // only opened, not run + + assertTrue(consumer.snapshotState(123, 123) == null); //arbitrary checkpoint id and timestamp + } + + // ---------------------------------------------------------------------- + // Tests related to fetcher initialization + // ---------------------------------------------------------------------- + + @Test + @SuppressWarnings("unchecked") + public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws Exception { + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher); + + // assume the given config is correct + PowerMockito.mockStatic(KinesisConfigUtil.class); + PowerMockito.doNothing().when(KinesisConfigUtil.class); + + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer( + "fakeStream", new Properties(), 10, 2); + consumer.open(new Configuration()); + consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); + + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(false); + } + + @Test + @SuppressWarnings("unchecked") + public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception { + KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); + PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher); + + // assume the given config is correct + PowerMockito.mockStatic(KinesisConfigUtil.class); + PowerMockito.doNothing().when(KinesisConfigUtil.class); + + HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>(); + fakeRestoredState.put( + new KinesisStreamShard("fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + new SequenceNumber(UUID.randomUUID().toString())); + fakeRestoredState.put( + new KinesisStreamShard("fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), + new SequenceNumber(UUID.randomUUID().toString())); + fakeRestoredState.put( + new KinesisStreamShard("fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))), + new SequenceNumber(UUID.randomUUID().toString())); + fakeRestoredState.put( + new KinesisStreamShard("fakeStream2", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + new SequenceNumber(UUID.randomUUID().toString())); + fakeRestoredState.put( + new KinesisStreamShard("fakeStream2", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), + new SequenceNumber(UUID.randomUUID().toString())); + + TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer( + "fakeStream", new Properties(), 10, 2); + consumer.restoreState(fakeRestoredState); + consumer.open(new Configuration()); + consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); + + Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true); + for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) { + Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream( + restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId()); + Mockito.verify(mockedFetcher).registerNewSubscribedShardState( + new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue())); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java new file mode 100644 index 0000000..e79f9b1 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; +import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; +import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.HashMap; +import java.util.Map; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertTrue; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(TestableKinesisDataFetcher.class) +public class KinesisDataFetcherTest { + + @Test(expected = RuntimeException.class) + public void testIfNoShardsAreFoundShouldThrowException() throws Exception { + List<String> fakeStreams = new LinkedList<>(); + fakeStreams.add("fakeStream1"); + fakeStreams.add("fakeStream2"); + + HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = + KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams); + + TestableKinesisDataFetcher fetcher = + new TestableKinesisDataFetcher( + fakeStreams, + new Properties(), + 10, + 2, + new AtomicReference<Throwable>(), + new LinkedList<KinesisStreamShardState>(), + subscribedStreamsToLastSeenShardIdsUnderTest, + FakeKinesisBehavioursFactory.noShardsFoundForRequestedStreamsBehaviour()); + + fetcher.setIsRestoringFromFailure(false); // not restoring + + fetcher.runFetcher(); // this should throw RuntimeException + } + + @Test + public void testStreamToLastSeenShardStateIsCorrectlySetWhenNotRestoringFromFailure() throws Exception { + List<String> fakeStreams = new LinkedList<>(); + fakeStreams.add("fakeStream1"); + fakeStreams.add("fakeStream2"); + fakeStreams.add("fakeStream3"); + fakeStreams.add("fakeStream4"); + + HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = + KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams); + + Map<String,Integer> streamToShardCount = new HashMap<>(); + Random rand = new Random(); + for (String fakeStream : fakeStreams) { + streamToShardCount.put(fakeStream, rand.nextInt(5)+1); + } + + final TestableKinesisDataFetcher fetcher = + new TestableKinesisDataFetcher( + fakeStreams, + new Properties(), + 10, + 2, + new AtomicReference<Throwable>(), + new LinkedList<KinesisStreamShardState>(), + subscribedStreamsToLastSeenShardIdsUnderTest, + FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount)); + + fetcher.setIsRestoringFromFailure(false); + + PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class)); + Thread runFetcherThread = new Thread(new Runnable() { + @Override + public void run() { + try { + fetcher.runFetcher(); + } catch (Exception e) { + // + } + } + }); + runFetcherThread.start(); + Thread.sleep(1000); // sleep a while before closing + fetcher.shutdownFetcher(); + + + // assert that the streams tracked in the state are identical to the subscribed streams + Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet(); + assertTrue(streamsInState.size() == fakeStreams.size()); + assertTrue(streamsInState.containsAll(fakeStreams)); + + // assert that the last seen shards in state is correctly set + for (Map.Entry<String,String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) { + assertTrue( + streamToLastSeenShard.getValue().equals( + KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1))); + } + } + + @Test + public void testStreamToLastSeenShardStateIsCorrectlySetWhenNoNewShardsSinceRestoredCheckpoint() throws Exception { + List<String> fakeStreams = new LinkedList<>(); + fakeStreams.add("fakeStream1"); + fakeStreams.add("fakeStream2"); + + Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>(); + + // fakeStream1 has 3 shards before restore + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + UUID.randomUUID().toString()); + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), + UUID.randomUUID().toString()); + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))), + UUID.randomUUID().toString()); + + // fakeStream2 has 2 shards before restore + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream2", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + UUID.randomUUID().toString()); + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream2", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), + UUID.randomUUID().toString()); + + Map<String,Integer> streamToShardCount = new HashMap<>(); + streamToShardCount.put("fakeStream1", 3); // fakeStream1 will still have 3 shards after restore + streamToShardCount.put("fakeStream2", 2); // fakeStream2 will still have 2 shards after restore + + HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = + KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams); + + final TestableKinesisDataFetcher fetcher = + new TestableKinesisDataFetcher( + fakeStreams, + new Properties(), + 10, + 2, + new AtomicReference<Throwable>(), + new LinkedList<KinesisStreamShardState>(), + subscribedStreamsToLastSeenShardIdsUnderTest, + FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount)); + + for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) { + fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId()); + fetcher.registerNewSubscribedShardState( + new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue()))); + } + + fetcher.setIsRestoringFromFailure(true); + + PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class)); + Thread runFetcherThread = new Thread(new Runnable() { + @Override + public void run() { + try { + fetcher.runFetcher(); + } catch (Exception e) { + // + } + } + }); + runFetcherThread.start(); + Thread.sleep(1000); // sleep a while before closing + fetcher.shutdownFetcher(); + + // assert that the streams tracked in the state are identical to the subscribed streams + Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet(); + assertTrue(streamsInState.size() == fakeStreams.size()); + assertTrue(streamsInState.containsAll(fakeStreams)); + + // assert that the last seen shards in state is correctly set + for (Map.Entry<String,String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) { + assertTrue( + streamToLastSeenShard.getValue().equals( + KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1))); + } + } + + @Test + public void testStreamToLastSeenShardStateIsCorrectlySetWhenNewShardsFoundSinceRestoredCheckpoint() throws Exception { + List<String> fakeStreams = new LinkedList<>(); + fakeStreams.add("fakeStream1"); + fakeStreams.add("fakeStream2"); + + Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>(); + + // fakeStream1 has 3 shards before restore + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + UUID.randomUUID().toString()); + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), + UUID.randomUUID().toString()); + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))), + UUID.randomUUID().toString()); + + // fakeStream2 has 2 shards before restore + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream2", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + UUID.randomUUID().toString()); + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream2", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), + UUID.randomUUID().toString()); + + Map<String,Integer> streamToShardCount = new HashMap<>(); + streamToShardCount.put("fakeStream1", 3+1); // fakeStream1 had 3 shards before & 1 new shard after restore + streamToShardCount.put("fakeStream2", 2+3); // fakeStream2 had 2 shards before & 3 new shard after restore + + HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = + KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams); + + // using a non-resharded streams kinesis behaviour to represent that Kinesis is not resharded AFTER the restore + final TestableKinesisDataFetcher fetcher = + new TestableKinesisDataFetcher( + fakeStreams, + new Properties(), + 10, + 2, + new AtomicReference<Throwable>(), + new LinkedList<KinesisStreamShardState>(), + subscribedStreamsToLastSeenShardIdsUnderTest, + FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount)); + + for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) { + fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId()); + fetcher.registerNewSubscribedShardState( + new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue()))); + } + + fetcher.setIsRestoringFromFailure(true); + + PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class)); + Thread runFetcherThread = new Thread(new Runnable() { + @Override + public void run() { + try { + fetcher.runFetcher(); + } catch (Exception e) { + // + } + } + }); + runFetcherThread.start(); + Thread.sleep(1000); // sleep a while before closing + fetcher.shutdownFetcher(); + + // assert that the streams tracked in the state are identical to the subscribed streams + Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet(); + assertTrue(streamsInState.size() == fakeStreams.size()); + assertTrue(streamsInState.containsAll(fakeStreams)); + + // assert that the last seen shards in state is correctly set + for (Map.Entry<String,String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) { + assertTrue( + streamToLastSeenShard.getValue().equals( + KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1))); + } + } + + @Test + public void testStreamToLastSeenShardStateIsCorrectlySetWhenNoNewShardsSinceRestoredCheckpointAndSomeStreamsDoNotExist() throws Exception { + List<String> fakeStreams = new LinkedList<>(); + fakeStreams.add("fakeStream1"); + fakeStreams.add("fakeStream2"); + fakeStreams.add("fakeStream3"); // fakeStream3 will not have any shards + fakeStreams.add("fakeStream4"); // fakeStream4 will not have any shards + + Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>(); + + // fakeStream1 has 3 shards before restore + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + UUID.randomUUID().toString()); + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), + UUID.randomUUID().toString()); + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))), + UUID.randomUUID().toString()); + + // fakeStream2 has 2 shards before restore + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream2", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + UUID.randomUUID().toString()); + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream2", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), + UUID.randomUUID().toString()); + + Map<String,Integer> streamToShardCount = new HashMap<>(); + streamToShardCount.put("fakeStream1", 3); // fakeStream1 has fixed 3 shards + streamToShardCount.put("fakeStream2", 2); // fakeStream2 has fixed 2 shards + streamToShardCount.put("fakeStream3", 0); // no shards can be found for fakeStream3 + streamToShardCount.put("fakeStream4", 0); // no shards can be found for fakeStream4 + + HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = + KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams); + + // using a non-resharded streams kinesis behaviour to represent that Kinesis is not resharded AFTER the restore + final TestableKinesisDataFetcher fetcher = + new TestableKinesisDataFetcher( + fakeStreams, + new Properties(), + 10, + 2, + new AtomicReference<Throwable>(), + new LinkedList<KinesisStreamShardState>(), + subscribedStreamsToLastSeenShardIdsUnderTest, + FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount)); + + for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) { + fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId()); + fetcher.registerNewSubscribedShardState( + new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue()))); + } + + fetcher.setIsRestoringFromFailure(true); + + PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class)); + Thread runFetcherThread = new Thread(new Runnable() { + @Override + public void run() { + try { + fetcher.runFetcher(); + } catch (Exception e) { + // + } + } + }); + runFetcherThread.start(); + Thread.sleep(1000); // sleep a while before closing + fetcher.shutdownFetcher(); + + // assert that the streams tracked in the state are identical to the subscribed streams + Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet(); + assertTrue(streamsInState.size() == fakeStreams.size()); + assertTrue(streamsInState.containsAll(fakeStreams)); + + // assert that the last seen shards in state is correctly set + assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream1").equals( + KinesisShardIdGenerator.generateFromShardOrder(2))); + assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream2").equals( + KinesisShardIdGenerator.generateFromShardOrder(1))); + assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream3") == null); + assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream4") == null); + } + + @Test + public void testStreamToLastSeenShardStateIsCorrectlySetWhenNewShardsFoundSinceRestoredCheckpointAndSomeStreamsDoNotExist() throws Exception { + List<String> fakeStreams = new LinkedList<>(); + fakeStreams.add("fakeStream1"); + fakeStreams.add("fakeStream2"); + fakeStreams.add("fakeStream3"); // fakeStream3 will not have any shards + fakeStreams.add("fakeStream4"); // fakeStream4 will not have any shards + + Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>(); + + // fakeStream1 has 3 shards before restore + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + UUID.randomUUID().toString()); + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), + UUID.randomUUID().toString()); + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream1", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))), + UUID.randomUUID().toString()); + + // fakeStream2 has 2 shards before restore + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream2", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), + UUID.randomUUID().toString()); + restoredStateUnderTest.put( + new KinesisStreamShard( + "fakeStream2", + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), + UUID.randomUUID().toString()); + + Map<String,Integer> streamToShardCount = new HashMap<>(); + streamToShardCount.put("fakeStream1", 3+1); // fakeStream1 had 3 shards before & 1 new shard after restore + streamToShardCount.put("fakeStream2", 2+3); // fakeStream2 had 2 shards before & 2 new shard after restore + streamToShardCount.put("fakeStream3", 0); // no shards can be found for fakeStream3 + streamToShardCount.put("fakeStream4", 0); // no shards can be found for fakeStream4 + + HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest = + KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams); + + // using a non-resharded streams kinesis behaviour to represent that Kinesis is not resharded AFTER the restore + final TestableKinesisDataFetcher fetcher = + new TestableKinesisDataFetcher( + fakeStreams, + new Properties(), + 10, + 2, + new AtomicReference<Throwable>(), + new LinkedList<KinesisStreamShardState>(), + subscribedStreamsToLastSeenShardIdsUnderTest, + FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount)); + + for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) { + fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId()); + fetcher.registerNewSubscribedShardState( + new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue()))); + } + + fetcher.setIsRestoringFromFailure(true); + + PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class)); + Thread runFetcherThread = new Thread(new Runnable() { + @Override + public void run() { + try { + fetcher.runFetcher(); + } catch (Exception e) { + // + } + } + }); + runFetcherThread.start(); + Thread.sleep(1000); // sleep a while before closing + fetcher.shutdownFetcher(); + + // assert that the streams tracked in the state are identical to the subscribed streams + Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet(); + assertTrue(streamsInState.size() == fakeStreams.size()); + assertTrue(streamsInState.containsAll(fakeStreams)); + + // assert that the last seen shards in state is correctly set + assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream1").equals( + KinesisShardIdGenerator.generateFromShardOrder(3))); + assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream2").equals( + KinesisShardIdGenerator.generateFromShardOrder(4))); + assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream3") == null); + assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream4") == null); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java new file mode 100644 index 0000000..96764a4 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.services.kinesis.model.HashKeyRange; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.commons.lang.StringUtils; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; +import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; +import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher; +import org.junit.Test; +import org.mockito.Mockito; + +import java.math.BigInteger; +import java.util.Collections; +import java.util.LinkedList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertTrue; + +public class ShardConsumerTest { + + @Test + public void testCorrectNumOfCollectedRecordsAndUpdatedState() { + KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard( + "fakeStream", + new Shard() + .withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)) + .withHashKeyRange( + new HashKeyRange() + .withStartingHashKey("0") + .withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString()))); + + LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>(); + subscribedShardsStateUnderTest.add( + new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState"))); + + TestableKinesisDataFetcher fetcher = + new TestableKinesisDataFetcher( + Collections.singletonList("fakeStream"), + new Properties(), + 10, + 2, + new AtomicReference<Throwable>(), + subscribedShardsStateUnderTest, + KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")), + Mockito.mock(KinesisProxyInterface.class)); + + new ShardConsumer<>( + fetcher, + 0, + subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(), + subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(), + FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9)).run(); + + assertTrue(fetcher.getNumOfElementsCollected() == 1000); + assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals( + SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())); + } + + @Test + public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() { + KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard( + "fakeStream", + new Shard() + .withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)) + .withHashKeyRange( + new HashKeyRange() + .withStartingHashKey("0") + .withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString()))); + + LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>(); + subscribedShardsStateUnderTest.add( + new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState"))); + + TestableKinesisDataFetcher fetcher = + new TestableKinesisDataFetcher( + Collections.singletonList("fakeStream"), + new Properties(), + 10, + 2, + new AtomicReference<Throwable>(), + subscribedShardsStateUnderTest, + KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")), + Mockito.mock(KinesisProxyInterface.class)); + + new ShardConsumer<>( + fetcher, + 0, + subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(), + subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(), + // Get a total of 1000 records with 9 getRecords() calls, + // and the 7th getRecords() call will encounter an unexpected expired shard iterator + FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000, 9, 7)).run(); + + assertTrue(fetcher.getNumOfElementsCollected() == 1000); + assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals( + SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java new file mode 100644 index 0000000..6e02a55 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis.manualtests; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; +import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.util.Collector; + +import java.nio.ByteBuffer; +import java.util.Properties; + +/** + * This is a manual test for the AWS Kinesis connector in Flink. + * + * It uses: + * - A custom KinesisSerializationSchema + * - A custom KinesisPartitioner + * + * Invocation: + * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX + */ +public class ManualConsumerProducerTest { + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(4); + + DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator()); + + Properties kinesisProducerConfig = new Properties(); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); + + FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>( + new KinesisSerializationSchema<String>() { + @Override + public ByteBuffer serialize(String element) { + return ByteBuffer.wrap(element.getBytes()); + } + + // every 10th element goes into a different stream + @Override + public String getTargetStream(String element) { + if(element.split("-")[0].endsWith("0")) { + return "flink-test-2"; + } + return null; // send to default stream + } + }, + kinesisProducerConfig + ); + + kinesis.setFailOnError(true); + kinesis.setDefaultStream("test-flink"); + kinesis.setDefaultPartition("0"); + kinesis.setCustomPartitioner(new KinesisPartitioner<String>() { + @Override + public String getPartitionId(String element) { + int l = element.length(); + return element.substring(l - 1, l); + } + }); + simpleStringStream.addSink(kinesis); + + + // consuming topology + Properties consumerProps = new Properties(); + consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); + consumerProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); + consumerProps.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region")); + DataStream<String> consuming = see.addSource(new FlinkKinesisConsumer<>("test-flink", new SimpleStringSchema(), consumerProps)); + // validate consumed records for correctness + consuming.flatMap(new FlatMapFunction<String, String>() { + @Override + public void flatMap(String value, Collector<String> out) throws Exception { + String[] parts = value.split("-"); + try { + long l = Long.parseLong(parts[0]); + if(l < 0) { + throw new RuntimeException("Negative"); + } + } catch(NumberFormatException nfe) { + throw new RuntimeException("First part of '" + value + "' is not a valid numeric type"); + } + if(parts[1].length() != 12) { + throw new RuntimeException("Second part of '" + value + "' doesn't have 12 characters"); + } + } + }); + consuming.print(); + + see.execute(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java new file mode 100644 index 0000000..2e452c1 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis.manualtests; + +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This test first starts a data generator, producing data into kinesis. + * Then, it starts a consuming topology, ensuring that all records up to a certain + * point have been seen. + * + * Invocation: + * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX + */ +public class ManualExactlyOnceTest { + + private static final Logger LOG = LoggerFactory.getLogger(ManualExactlyOnceTest.class); + + static final int TOTAL_EVENT_COUNT = 1000; // the producer writes one per 10 ms, so it runs for 10k ms = 10 seconds + + public static void main(String[] args) throws Exception { + final ParameterTool pt = ParameterTool.fromArgs(args); + LOG.info("Starting exactly once test"); + + final String streamName = "flink-test-" + UUID.randomUUID().toString(); + final String accessKey = pt.getRequired("accessKey"); + final String secretKey = pt.getRequired("secretKey"); + final String region = pt.getRequired("region"); + + Properties configProps = new Properties(); + configProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, accessKey); + configProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey); + configProps.setProperty(AWSConfigConstants.AWS_REGION, region); + AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps); + + // create a stream for the test: + client.createStream(streamName, 1); + + // wait until stream has been created + DescribeStreamResult status = client.describeStream(streamName); + LOG.info("status {}" ,status); + while(!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) { + status = client.describeStream(streamName); + LOG.info("Status of stream {}", status); + Thread.sleep(1000); + } + + final Configuration flinkConfig = new Configuration(); + flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false); + flink.start(); + + final int flinkPort = flink.getLeaderRPCPort(); + + try { + final AtomicReference<Throwable> producerError = new AtomicReference<>(); + Thread producerThread = KinesisEventsGeneratorProducerThread.create( + TOTAL_EVENT_COUNT, 2, + accessKey, secretKey, region, streamName, + producerError, flinkPort, flinkConfig); + producerThread.start(); + + final AtomicReference<Throwable> consumerError = new AtomicReference<>(); + Thread consumerThread = ExactlyOnceValidatingConsumerThread.create( + TOTAL_EVENT_COUNT, 200, 2, 500, 500, + accessKey, secretKey, region, streamName, + consumerError, flinkPort, flinkConfig); + consumerThread.start(); + + boolean deadlinePassed = false; + long deadline = System.currentTimeMillis() + (1000 * 2 * 60); // wait at most for two minutes + // wait until both producer and consumer finishes, or an unexpected error is thrown + while ((consumerThread.isAlive() || producerThread.isAlive()) && + (producerError.get() == null && consumerError.get() == null)) { + Thread.sleep(1000); + if (System.currentTimeMillis() >= deadline) { + LOG.warn("Deadline passed"); + deadlinePassed = true; + break; // enough waiting + } + } + + if (producerThread.isAlive()) { + producerThread.interrupt(); + } + + if (consumerThread.isAlive()) { + consumerThread.interrupt(); + } + + if (producerError.get() != null) { + LOG.info("+++ TEST failed! +++"); + throw new RuntimeException("Producer failed", producerError.get()); + } + if (consumerError.get() != null) { + LOG.info("+++ TEST failed! +++"); + throw new RuntimeException("Consumer failed", consumerError.get()); + } + + if (!deadlinePassed) { + LOG.info("+++ TEST passed! +++"); + } else { + LOG.info("+++ TEST failed! +++"); + } + + } finally { + client.deleteStream(streamName); + client.shutdown(); + + // stopping flink + flink.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java new file mode 100644 index 0000000..6abea2a --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.manualtests; + +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; +import com.amazonaws.services.kinesis.model.PutRecordsResult; +import com.amazonaws.services.kinesis.model.PutRecordsRequest; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread; +import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This test first starts a data generator, producing data into kinesis. + * Then, it starts a consuming topology, ensuring that all records up to a certain + * point have been seen. While the data generator and consuming topology is running, + * the kinesis stream is resharded two times. + * + * Invocation: + * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX + */ +public class ManualExactlyOnceWithStreamReshardingTest { + + private static final Logger LOG = LoggerFactory.getLogger(ManualExactlyOnceWithStreamReshardingTest.class); + + static final int TOTAL_EVENT_COUNT = 20000; // a large enough record count so we can test resharding + + public static void main(String[] args) throws Exception { + final ParameterTool pt = ParameterTool.fromArgs(args); + LOG.info("Starting exactly once with stream resharding test"); + + final String streamName = "flink-test-" + UUID.randomUUID().toString(); + final String accessKey = pt.getRequired("accessKey"); + final String secretKey = pt.getRequired("secretKey"); + final String region = pt.getRequired("region"); + + final Properties configProps = new Properties(); + configProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, accessKey); + configProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey); + configProps.setProperty(ConsumerConfigConstants.AWS_REGION, region); + configProps.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "0"); + final AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps); + + // the stream is first created with 1 shard + client.createStream(streamName, 1); + + // wait until stream has been created + DescribeStreamResult status = client.describeStream(streamName); + LOG.info("status {}", status); + while(!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) { + status = client.describeStream(streamName); + LOG.info("Status of stream {}", status); + Thread.sleep(1000); + } + + final Configuration flinkConfig = new Configuration(); + flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false); + flink.start(); + + final int flinkPort = flink.getLeaderRPCPort(); + + try { + // we have to use a manual generator here instead of the FlinkKinesisProducer + // because the FlinkKinesisProducer currently has a problem where records will be resent to a shard + // when resharding happens; this affects the consumer exactly-once validation test and will never pass + final AtomicReference<Throwable> producerError = new AtomicReference<>(); + Runnable manualGenerate = new Runnable() { + @Override + public void run() { + AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps); + int count = 0; + final int batchSize = 30; + while (true) { + try { + Thread.sleep(10); + + Set<PutRecordsRequestEntry> batch = new HashSet<>(); + for (int i=count; i<count+batchSize; i++) { + if (i >= TOTAL_EVENT_COUNT) { + break; + } + batch.add( + new PutRecordsRequestEntry() + .withData(ByteBuffer.wrap(((i) + "-" + RandomStringUtils.randomAlphabetic(12)).getBytes())) + .withPartitionKey(UUID.randomUUID().toString())); + } + count += batchSize; + + PutRecordsResult result = client.putRecords(new PutRecordsRequest().withStreamName(streamName).withRecords(batch)); + + // the putRecords() operation may have failing records; to keep this test simple + // instead of retrying on failed records, we simply pass on a runtime exception + // and let this test fail + if (result.getFailedRecordCount() > 0) { + producerError.set(new RuntimeException("The producer has failed records in one of the put batch attempts.")); + break; + } + + if (count >= TOTAL_EVENT_COUNT) { + break; + } + } catch (Exception e) { + producerError.set(e); + } + } + } + }; + Thread producerThread = new Thread(manualGenerate); + producerThread.start(); + + final AtomicReference<Throwable> consumerError = new AtomicReference<>(); + Thread consumerThread = ExactlyOnceValidatingConsumerThread.create( + TOTAL_EVENT_COUNT, 10000, 2, 500, 500, + accessKey, secretKey, region, streamName, + consumerError, flinkPort, flinkConfig); + consumerThread.start(); + + // reshard the Kinesis stream while the producer / and consumers are running + Runnable splitShard = new Runnable() { + @Override + public void run() { + try { + // first, split shard in the middle of the hash range + Thread.sleep(5000); + LOG.info("Splitting shard ..."); + client.splitShard( + streamName, + KinesisShardIdGenerator.generateFromShardOrder(0), + "170141183460469231731687303715884105727"); + + // wait until the split shard operation finishes updating ... + DescribeStreamResult status; + Random rand = new Random(); + do { + status = null; + while (status == null) { + // retry until we get status + try { + status = client.describeStream(streamName); + } catch (LimitExceededException lee) { + LOG.warn("LimitExceededException while describing stream ... retrying ..."); + Thread.sleep(rand.nextInt(1200)); + } + } + } while (!status.getStreamDescription().getStreamStatus().equals("ACTIVE")); + + // then merge again + Thread.sleep(7000); + LOG.info("Merging shards ..."); + client.mergeShards( + streamName, + KinesisShardIdGenerator.generateFromShardOrder(1), + KinesisShardIdGenerator.generateFromShardOrder(2)); + } catch (InterruptedException iex) { + // + } + } + }; + Thread splitShardThread = new Thread(splitShard); + splitShardThread.start(); + + boolean deadlinePassed = false; + long deadline = System.currentTimeMillis() + (1000 * 5 * 60); // wait at most for five minutes + // wait until both producer and consumer finishes, or an unexpected error is thrown + while ((consumerThread.isAlive() || producerThread.isAlive()) && + (producerError.get() == null && consumerError.get() == null)) { + Thread.sleep(1000); + if (System.currentTimeMillis() >= deadline) { + LOG.warn("Deadline passed"); + deadlinePassed = true; + break; // enough waiting + } + } + + if (producerThread.isAlive()) { + producerThread.interrupt(); + } + + if (consumerThread.isAlive()) { + consumerThread.interrupt(); + } + + if (producerError.get() != null) { + LOG.info("+++ TEST failed! +++"); + throw new RuntimeException("Producer failed", producerError.get()); + + } + + if (consumerError.get() != null) { + LOG.info("+++ TEST failed! +++"); + throw new RuntimeException("Consumer failed", consumerError.get()); + } + + if (!deadlinePassed) { + LOG.info("+++ TEST passed! +++"); + } else { + LOG.info("+++ TEST failed! +++"); + } + + } finally { + client.deleteStream(streamName); + client.shutdown(); + + // stopping flink + flink.stop(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java new file mode 100644 index 0000000..35e9ef6 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis.manualtests; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; +import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; + +import java.nio.ByteBuffer; +import java.util.Properties; + +/** + * This is a manual test for the AWS Kinesis connector in Flink. + * + * It uses: + * - A custom KinesisSerializationSchema + * - A custom KinesisPartitioner + * + * The streams "test-flink" and "flink-test-2" must exist. + * + * Invocation: + * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX + */ +public class ManualProducerTest { + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(4); + + DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator()); + + Properties kinesisProducerConfig = new Properties(); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); + + FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>( + new KinesisSerializationSchema<String>() { + @Override + public ByteBuffer serialize(String element) { + return ByteBuffer.wrap(element.getBytes()); + } + + // every 10th element goes into a different stream + @Override + public String getTargetStream(String element) { + if(element.split("-")[0].endsWith("0")) { + return "flink-test-2"; + } + return null; // send to default stream + } + }, + kinesisProducerConfig + ); + + kinesis.setFailOnError(true); + kinesis.setDefaultStream("test-flink"); + kinesis.setDefaultPartition("0"); + kinesis.setCustomPartitioner(new KinesisPartitioner<String>() { + @Override + public String getPartitionId(String element) { + int l = element.length(); + return element.substring(l - 1, l); + } + }); + simpleStringStream.addSink(kinesis); + + see.execute(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java new file mode 100644 index 0000000..157964c --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.testutils; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.test.util.SuccessException; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.BitSet; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.test.util.TestUtils.tryExecute; + +/** + * A thread that runs a topology with the FlinkKinesisConsumer as source, followed by two flat map + * functions, one that performs artificial failures and another that validates exactly-once guarantee + */ +public class ExactlyOnceValidatingConsumerThread { + + private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceValidatingConsumerThread.class); + + public static Thread create(final int totalEventCount, + final int failAtRecordCount, + final int parallelism, + final int checkpointInterval, + final long restartDelay, + final String awsAccessKey, + final String awsSecretKey, + final String awsRegion, + final String kinesisStreamName, + final AtomicReference<Throwable> errorHandler, + final int flinkPort, + final Configuration flinkConfig) { + Runnable exactlyOnceValidationConsumer = new Runnable() { + @Override + public void run() { + try { + StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort, flinkConfig); + see.setParallelism(parallelism); + see.enableCheckpointing(checkpointInterval); + // we restart two times + see.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, restartDelay)); + + // consuming topology + Properties consumerProps = new Properties(); + consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey); + consumerProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretKey); + consumerProps.setProperty(ConsumerConfigConstants.AWS_REGION, awsRegion); + // start reading from beginning + consumerProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name()); + DataStream<String> consuming = see.addSource(new FlinkKinesisConsumer<>(kinesisStreamName, new SimpleStringSchema(), consumerProps)); + consuming + .flatMap(new ArtificialFailOnceFlatMapper(failAtRecordCount)) + // validate consumed records for correctness (use only 1 instance to validate all consumed records) + .flatMap(new ExactlyOnceValidatingMapper(totalEventCount)).setParallelism(1); + + LOG.info("Starting consuming topology"); + tryExecute(see, "Consuming topo"); + LOG.info("Consuming topo finished"); + } catch (Exception e) { + LOG.warn("Error while running consuming topology", e); + errorHandler.set(e); + } + } + }; + + return new Thread(exactlyOnceValidationConsumer); + } + + private static class ExactlyOnceValidatingMapper implements FlatMapFunction<String,String>, Checkpointed<BitSet> { + + private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceValidatingMapper.class); + + private final int totalEventCount; + private BitSet validator; + + public ExactlyOnceValidatingMapper(int totalEventCount) { + this.totalEventCount = totalEventCount; + this.validator = new BitSet(totalEventCount); + } + + @Override + public void flatMap(String value, Collector<String> out) throws Exception { + LOG.info("Consumed {}", value); + + int id = Integer.parseInt(value.split("-")[0]); + if(validator.get(id)) { + throw new RuntimeException("Saw id " + id +" twice!"); + } + validator.set(id); + if(id > totalEventCount-1) { + throw new RuntimeException("Out of bounds ID observed"); + } + + if(validator.nextClearBit(0) == totalEventCount) { + throw new SuccessException(); + } + } + + @Override + public BitSet snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return validator; + } + + @Override + public void restoreState(BitSet state) throws Exception { + this.validator = state; + } + } + + private static class ArtificialFailOnceFlatMapper extends RichFlatMapFunction<String,String> { + int count = 0; + + private final int failAtRecordCount; + + public ArtificialFailOnceFlatMapper(int failAtRecordCount) { + this.failAtRecordCount = failAtRecordCount; + } + + @Override + public void flatMap(String value, Collector<String> out) throws Exception { + if (count++ >= failAtRecordCount && getRuntimeContext().getAttemptNumber() == 0) { + throw new RuntimeException("Artificial failure. Restart please."); + } + out.collect(value); + } + } +}
