Repository: flink Updated Branches: refs/heads/master 78d9ae9ba -> b7d83899a
[FLINK-4514][kinesis-connector] Handle unexpected ExpiredIteratorExceptions This closes #2432 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7d83899 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7d83899 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7d83899 Branch: refs/heads/master Commit: b7d83899abfe8175b1fc9e526b6afb2ca7a056ed Parents: 78d9ae9 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Mon Aug 29 17:30:39 2016 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue Aug 30 23:50:40 2016 +0800 ---------------------------------------------------------------------- .../kinesis/config/ConsumerConfigConstants.java | 7 ++ .../kinesis/internals/ShardConsumer.java | 74 +++++++++++++++----- .../kinesis/util/KinesisConfigUtil.java | 30 +++++--- .../kinesis/internals/ShardConsumerTest.java | 40 +++++++++++ .../testutils/FakeKinesisBehavioursFactory.java | 66 +++++++++++++++-- 5 files changed, 187 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 28ff3e4..76c20ed 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kinesis.config; import com.amazonaws.services.kinesis.model.ShardIteratorType; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; /** @@ -128,4 +129,10 @@ public class ConsumerConfigConstants extends AWSConfigConstants { public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L; + /** + * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured + * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators. + */ + public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L; + } http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 6e24e65..612a4a7 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kinesis.internals; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.ShardIteratorType; @@ -29,6 +30,8 @@ import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.math.BigInteger; @@ -44,6 +47,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class ShardConsumer<T> implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class); + private final KinesisDeserializationSchema<T> deserializer; private final KinesisProxyInterface kinesis; @@ -133,7 +138,7 @@ public class ShardConsumer<T> implements Runnable { kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); // get only the last aggregated record - GetRecordsResult getRecordsResult = kinesis.getRecords(itrForLastAggregatedRecord, 1); + GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1); List<UserRecord> fetchedRecords = deaggregateRecords( getRecordsResult.getRecords(), @@ -168,7 +173,7 @@ public class ShardConsumer<T> implements Runnable { Thread.sleep(fetchIntervalMillis); } - GetRecordsResult getRecordsResult = kinesis.getRecords(nextShardItr, maxNumberOfRecordsPerFetch); + GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch); // each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding List<UserRecord> fetchedRecords = deaggregateRecords( @@ -199,11 +204,15 @@ public class ShardConsumer<T> implements Runnable { } /** - * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. + * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last + * successfully collected sequence number in this shard consumer is also updated so that + * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard + * iterators if necessary. + * * Note that the server-side Kinesis timestamp is attached to the record when collected. When the * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default. * - * @param record + * @param record record to deserialize and collect * @throws IOException */ private void deserializeRecordForCollectionAndUpdateState(UserRecord record) @@ -223,19 +232,52 @@ public class ShardConsumer<T> implements Runnable { subscribedShard.getStreamName(), subscribedShard.getShard().getShardId()); - if (record.isAggregated()) { - fetcherRef.emitRecordAndUpdateState( - value, - approxArrivalTimestamp, - subscribedShardStateIndex, - new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())); - } else { - fetcherRef.emitRecordAndUpdateState( - value, - approxArrivalTimestamp, - subscribedShardStateIndex, - new SequenceNumber(record.getSequenceNumber())); + SequenceNumber collectedSequenceNumber = (record.isAggregated()) + ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()) + : new SequenceNumber(record.getSequenceNumber()); + + fetcherRef.emitRecordAndUpdateState( + value, + approxArrivalTimestamp, + subscribedShardStateIndex, + collectedSequenceNumber); + + lastSequenceNum = collectedSequenceNumber; + } + + /** + * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected + * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on + * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should + * be used for the next call to this method. + * + * Note: it is important that this method is not called again before all the records from the last result have been + * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise + * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to + * incorrect shard iteration if the iterator had to be refreshed. + * + * @param shardItr shard iterator to use + * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt + * @return get records result + * @throws InterruptedException + */ + private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException { + GetRecordsResult getRecordsResult = null; + while (getRecordsResult == null) { + try { + getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); + } catch (ExpiredIteratorException eiEx) { + LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + + " refreshing the iterator ...", shardItr, subscribedShard); + shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); + + // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator + if (fetchIntervalMillis != 0) { + Thread.sleep(fetchIntervalMillis); + } + } } + return getRecordsResult; } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index d9d553b..9aa14ad 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConsta import java.util.Properties; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -65,13 +66,13 @@ public class KinesisConfigUtil { "Invalid value given for maximum retry attempts for getRecords shard operation. Must be a valid non-negative integer value."); validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, - "Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value"); + "Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value."); validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, - "Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value"); + "Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value."); validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, - "Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value"); + "Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value."); validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "Invalid value given for getRecords sleep interval in milliseconds. Must be a valid non-negative long value."); @@ -80,25 +81,34 @@ public class KinesisConfigUtil { "Invalid value given for maximum retry attempts for getShardIterator shard operation. Must be a valid non-negative integer value."); validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, - "Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value"); + "Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value."); validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, - "Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value"); + "Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value."); validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, - "Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value"); + "Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value."); validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, - "Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value"); + "Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value."); validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, - "Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value"); + "Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value."); validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, - "Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value"); + "Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value."); validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, - "Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value"); + "Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value."); + + if (config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) { + checkArgument( + Long.parseLong(config.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) + < ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS, + "Invalid value given for getRecords sleep interval in milliseconds. Must be lower than " + + ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS + " milliseconds." + ); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java index 5b3e1a5..96764a4 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java @@ -79,4 +79,44 @@ public class ShardConsumerTest { SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())); } + @Test + public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() { + KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard( + "fakeStream", + new Shard() + .withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)) + .withHashKeyRange( + new HashKeyRange() + .withStartingHashKey("0") + .withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString()))); + + LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>(); + subscribedShardsStateUnderTest.add( + new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState"))); + + TestableKinesisDataFetcher fetcher = + new TestableKinesisDataFetcher( + Collections.singletonList("fakeStream"), + new Properties(), + 10, + 2, + new AtomicReference<Throwable>(), + subscribedShardsStateUnderTest, + KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")), + Mockito.mock(KinesisProxyInterface.class)); + + new ShardConsumer<>( + fetcher, + 0, + subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(), + subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(), + // Get a total of 1000 records with 9 getRecords() calls, + // and the 7th getRecords() call will encounter an unexpected expired shard iterator + FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000, 9, 7)).run(); + + assertTrue(fetcher.getNumOfElementsCollected() == 1000); + assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals( + SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java index fc98fca..65e6d4e 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kinesis.testutils; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Shard; @@ -33,13 +34,15 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * Factory for different kinds of fake Kinesis behaviours using the {@link KinesisProxyInterface} interface. */ public class FakeKinesisBehavioursFactory { // ------------------------------------------------------------------------ - // Behaviours related to shard listing and resharding, used in ShardDiscovererTest + // Behaviours related to shard listing and resharding, used in KinesisDataFetcherTest // ------------------------------------------------------------------------ public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() { @@ -75,14 +78,69 @@ public class FakeKinesisBehavioursFactory { public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCalls(final int numOfRecords, final int numOfGetRecordsCalls) { return new SingleShardEmittingFixNumOfRecordsKinesis(numOfRecords, numOfGetRecordsCalls); } + + public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator( + final int numOfRecords, final int numOfGetRecordsCall, final int orderOfCallToExpire) { + return new SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis( + numOfRecords, numOfGetRecordsCall, orderOfCallToExpire); + } + + public static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis { + + private boolean expiredOnceAlready = false; + private boolean expiredIteratorRefreshed = false; + private final int orderOfCallToExpire; + + public SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(final int numOfRecords, + final int numOfGetRecordsCalls, + final int orderOfCallToExpire) { + super(numOfRecords, numOfGetRecordsCalls); + checkArgument(orderOfCallToExpire <= numOfGetRecordsCalls, + "can not test unexpected expired iterator if orderOfCallToExpire is larger than numOfGetRecordsCalls"); + this.orderOfCallToExpire = orderOfCallToExpire; + } + + @Override + public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { + if ((Integer.valueOf(shardIterator) == orderOfCallToExpire-1) && !expiredOnceAlready) { + // we fake only once the expired iterator exception at the specified get records attempt order + expiredOnceAlready = true; + throw new ExpiredIteratorException("Artificial expired shard iterator"); + } else if (expiredOnceAlready && !expiredIteratorRefreshed) { + // if we've thrown the expired iterator exception already, but the iterator was not refreshed, + // throw a hard exception to the test that is testing this Kinesis behaviour + throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call"); + } else { + // assuming that the maxRecordsToGet is always large enough + return new GetRecordsResult() + .withRecords(shardItrToRecordBatch.get(shardIterator)) + .withNextShardIterator( + (Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1) + ? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null + } + } + + @Override + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) { + if (!expiredOnceAlready) { + // for the first call, just return the iterator of the first batch of records + return "0"; + } else { + // fake the iterator refresh when this is called again after getRecords throws expired iterator + // exception on the orderOfCallToExpire attempt + expiredIteratorRefreshed = true; + return String.valueOf(orderOfCallToExpire-1); + } + } + } private static class SingleShardEmittingFixNumOfRecordsKinesis implements KinesisProxyInterface { - private final int totalNumOfGetRecordsCalls; + protected final int totalNumOfGetRecordsCalls; - private final int totalNumOfRecords; + protected final int totalNumOfRecords; - private final Map<String,List<Record>> shardItrToRecordBatch; + protected final Map<String,List<Record>> shardItrToRecordBatch; public SingleShardEmittingFixNumOfRecordsKinesis(final int numOfRecords, final int numOfGetRecordsCalls) { this.totalNumOfRecords = numOfRecords;
