http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java deleted file mode 100644 index a06fdca..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ /dev/null @@ -1,679 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.internals; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; -import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; -import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; -import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; -import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; -import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; -import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; -import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; -import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; -import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; -import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; -import org.apache.flink.util.InstantiationUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -import java.util.LinkedList; -import java.util.List; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates - * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: - * <ul> - * <li>1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset - * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be - * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe - * to the same subset of shards even after restoring)</li> - * <li>2. decide where in each discovered shard should the fetcher start subscribing to</li> - * <li>3. subscribe to shards by creating a single thread for each shard</li> - * </ul> - * - * <p>The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), - * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed - * by multiple threads, these operations should only be done using the handler methods provided in this class. - */ -public class KinesisDataFetcher<T> { - - private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - - // ------------------------------------------------------------------------ - // Consumer-wide settings - // ------------------------------------------------------------------------ - - /** Configuration properties for the Flink Kinesis Consumer */ - private final Properties configProps; - - /** The list of Kinesis streams that the consumer is subscribing to */ - private final List<String> streams; - - /** - * The deserialization schema we will be using to convert Kinesis records to Flink objects. - * Note that since this might not be thread-safe, {@link ShardConsumer}s using this must - * clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}. - */ - private final KinesisDeserializationSchema<T> deserializationSchema; - - // ------------------------------------------------------------------------ - // Subtask-specific settings - // ------------------------------------------------------------------------ - - /** Runtime context of the subtask that this fetcher was created in */ - private final RuntimeContext runtimeContext; - - private final int totalNumberOfConsumerSubtasks; - - private final int indexOfThisConsumerSubtask; - - /** - * This flag should be set by {@link FlinkKinesisConsumer} using - * {@link KinesisDataFetcher#setIsRestoringFromFailure(boolean)} - */ - private boolean isRestoredFromFailure; - - // ------------------------------------------------------------------------ - // Executor services to run created threads - // ------------------------------------------------------------------------ - - /** Executor service to run {@link ShardConsumer}s to consume Kinesis shards */ - private final ExecutorService shardConsumersExecutor; - - // ------------------------------------------------------------------------ - // Managed state, accessed and updated across multiple threads - // ------------------------------------------------------------------------ - - /** The last discovered shard ids of each subscribed stream, updated as the fetcher discovers new shards in. - * Note: this state will be updated if new shards are found when {@link KinesisDataFetcher#discoverNewShardsToSubscribe()} is called. - */ - private final Map<String, String> subscribedStreamsToLastDiscoveredShardIds; - - /** - * The shards, along with their last processed sequence numbers, that this fetcher is subscribed to. The fetcher - * will add new subscribed shard states to this list as it discovers new shards. {@link ShardConsumer} threads update - * the last processed sequence number of subscribed shards as they fetch and process records. - * - * <p>Note that since multiple {@link ShardConsumer} threads will be performing operations on this list, all operations - * must be wrapped in synchronized blocks on the {@link KinesisDataFetcher#checkpointLock} lock. For this purpose, - * all threads must use the following thread-safe methods this class provides to operate on this list: - * <ul> - * <li>{@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}</li> - * <li>{@link KinesisDataFetcher#updateState(int, SequenceNumber)}</li> - * <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(T, long, int, SequenceNumber)}</li> - * </ul> - */ - private final List<KinesisStreamShardState> subscribedShardsState; - - private final SourceFunction.SourceContext<T> sourceContext; - - /** Checkpoint lock, also used to synchronize operations on subscribedShardsState */ - private final Object checkpointLock; - - /** Reference to the first error thrown by any of the {@link ShardConsumer} threads */ - private final AtomicReference<Throwable> error; - - /** The Kinesis proxy that the fetcher will be using to discover new shards */ - private final KinesisProxyInterface kinesis; - - /** Thread that executed runFetcher() */ - private Thread mainThread; - - /** - * The current number of shards that are actively read by this fetcher. - * - * This value is updated in {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, - * and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}. - */ - private final AtomicInteger numberOfActiveShards = new AtomicInteger(0); - - private volatile boolean running = true; - - /** - * Creates a Kinesis Data Fetcher. - * - * @param streams the streams to subscribe to - * @param sourceContext context of the source function - * @param runtimeContext this subtask's runtime context - * @param configProps the consumer configuration properties - * @param deserializationSchema deserialization schema - */ - public KinesisDataFetcher(List<String> streams, - SourceFunction.SourceContext<T> sourceContext, - RuntimeContext runtimeContext, - Properties configProps, - KinesisDeserializationSchema<T> deserializationSchema) { - this(streams, - sourceContext, - sourceContext.getCheckpointLock(), - runtimeContext, - configProps, - deserializationSchema, - new AtomicReference<Throwable>(), - new LinkedList<KinesisStreamShardState>(), - createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), - KinesisProxy.create(configProps)); - } - - /** This constructor is exposed for testing purposes */ - protected KinesisDataFetcher(List<String> streams, - SourceFunction.SourceContext<T> sourceContext, - Object checkpointLock, - RuntimeContext runtimeContext, - Properties configProps, - KinesisDeserializationSchema<T> deserializationSchema, - AtomicReference<Throwable> error, - LinkedList<KinesisStreamShardState> subscribedShardsState, - HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds, - KinesisProxyInterface kinesis) { - this.streams = checkNotNull(streams); - this.configProps = checkNotNull(configProps); - this.sourceContext = checkNotNull(sourceContext); - this.checkpointLock = checkNotNull(checkpointLock); - this.runtimeContext = checkNotNull(runtimeContext); - this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks(); - this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask(); - this.deserializationSchema = checkNotNull(deserializationSchema); - this.kinesis = checkNotNull(kinesis); - - this.error = checkNotNull(error); - this.subscribedShardsState = checkNotNull(subscribedShardsState); - this.subscribedStreamsToLastDiscoveredShardIds = checkNotNull(subscribedStreamsToLastDiscoveredShardIds); - - this.shardConsumersExecutor = - createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks()); - } - - /** - * Starts the fetcher. After starting the fetcher, it can only - * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}. - * - * @throws Exception the first error or exception thrown by the fetcher or any of the threads created by the fetcher. - */ - public void runFetcher() throws Exception { - - // check that we are running before proceeding - if (!running) { - return; - } - - this.mainThread = Thread.currentThread(); - - // ------------------------------------------------------------------------ - // Procedures before starting the infinite while loop: - // ------------------------------------------------------------------------ - - // 1. query for any new shards that may have been created while the Kinesis consumer was not running, - // and register them to the subscribedShardState list. - if (LOG.isDebugEnabled()) { - String logFormat = (!isRestoredFromFailure) - ? "Subtask {} is trying to discover initial shards ..." - : "Subtask {} is trying to discover any new shards that were created while the consumer wasn't " + - "running due to failure ..."; - - LOG.debug(logFormat, indexOfThisConsumerSubtask); - } - List<KinesisStreamShard> newShardsCreatedWhileNotRunning = discoverNewShardsToSubscribe(); - for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) { - // the starting state for new shards created while the consumer wasn't running depends on whether or not - // we are starting fresh (not restoring from a checkpoint); when we are starting fresh, this simply means - // all existing shards of streams we are subscribing to are new shards; when we are restoring from checkpoint, - // any new shards due to Kinesis resharding from the time of the checkpoint will be considered new shards. - InitialPosition initialPosition = InitialPosition.valueOf(configProps.getProperty( - ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); - - SentinelSequenceNumber startingStateForNewShard = (isRestoredFromFailure) - ? SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM - : initialPosition.toSentinelSequenceNumber(); - - if (LOG.isInfoEnabled()) { - String logFormat = (!isRestoredFromFailure) - ? "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}" - : "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't " + - "running due to failure, starting state set as sequence number {}"; - - LOG.info(logFormat, indexOfThisConsumerSubtask, shard.toString(), startingStateForNewShard.get()); - } - registerNewSubscribedShardState(new KinesisStreamShardState(shard, startingStateForNewShard.get())); - } - - // 2. check that there is at least one shard in the subscribed streams to consume from (can be done by - // checking if at least one value in subscribedStreamsToLastDiscoveredShardIds is not null) - boolean hasShards = false; - StringBuilder streamsWithNoShardsFound = new StringBuilder(); - for (Map.Entry<String, String> streamToLastDiscoveredShardEntry : subscribedStreamsToLastDiscoveredShardIds.entrySet()) { - if (streamToLastDiscoveredShardEntry.getValue() != null) { - hasShards = true; - } else { - streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(", "); - } - } - - if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) { - LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams: {}", - indexOfThisConsumerSubtask, streamsWithNoShardsFound.toString()); - } - - if (!hasShards) { - throw new RuntimeException("No shards can be found for all subscribed streams: " + streams); - } - - // 3. start consuming any shard state we already have in the subscribedShardState up to this point; the - // subscribedShardState may already be seeded with values due to step 1., or explicitly added by the - // consumer using a restored state checkpoint - for (int seededStateIndex = 0; seededStateIndex < subscribedShardsState.size(); seededStateIndex++) { - KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex); - - // only start a consuming thread if the seeded subscribed shard has not been completely read already - if (!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { - - if (LOG.isInfoEnabled()) { - LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}", - indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(), - seededShardState.getLastProcessedSequenceNum(), seededStateIndex); - } - - shardConsumersExecutor.submit( - new ShardConsumer<>( - this, - seededStateIndex, - subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(), - subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum())); - } - } - - // ------------------------------------------------------------------------ - - // finally, start the infinite shard discovery and consumer launching loop; - // we will escape from this loop only when shutdownFetcher() or stopWithError() is called - - final long discoveryIntervalMillis = Long.valueOf( - configProps.getProperty( - ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, - Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS))); - - // FLINK-4341: - // For downstream operators that work on time (ex. window operators), we are required to emit a max value watermark - // for subtasks that won't continue to have shards to read from unless resharding happens in the future, otherwise - // the downstream watermarks would not advance, leading to unbounded accumulating state. - // - // The side-effect of this limitation is that on resharding, we must fail hard if the newly discovered shard - // is to be subscribed by a subtask that has previously emitted a max value watermark, otherwise the watermarks - // will be messed up. - // - // There are 2 cases were we need to either emit a max value watermark, or deliberately fail hard: - // (a) if this subtask has no more shards to read from unless resharding happens in the future, we emit a max - // value watermark. This case is encountered when 1) all previously read shards by this subtask were closed - // due to resharding, 2) when this subtask was initially only subscribed to closed shards while the consumer - // was told to start from TRIM_HORIZON, or 3) there was initially no shards for this subtask to read on startup. - // (b) this subtask has discovered new shards to read from due to a reshard; if this subtask has already emitted - // a max value watermark, we must deliberately fail hard to avoid messing up the watermarks. The new shards - // will be subscribed by this subtask after restore as initial shards on startup. - // - // TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager - // Please see FLINK-4341 for more detail - - boolean emittedMaxValueWatermark = false; - - if (this.numberOfActiveShards.get() == 0) { - // FLINK-4341 workaround case (a) - please see the above for details on this case - LOG.info("Subtask {} has no initial shards to read on startup; emitting max value watermark ...", - indexOfThisConsumerSubtask); - sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); - emittedMaxValueWatermark = true; - } - - while (running) { - if (LOG.isDebugEnabled()) { - LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...", - indexOfThisConsumerSubtask); - } - List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe(); - - // -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards -- - // Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(), there exists - // a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards - // may not correctly reflect the discover result in the below case determination. This may lead to incorrect - // case determination on the current discovery attempt, but can still be correctly handled on future attempts. - // - // Although this can be resolved by wrapping the current shard discovery attempt with the below - // case determination within a synchronized block on the checkpoint lock for atomicity, there will be - // considerable throughput performance regression as shard discovery is a remote call to AWS. Therefore, - // since the case determination is a temporary workaround for FLINK-4341, the race condition is tolerable as - // we can still eventually handle max value watermark emitting / deliberately failing on successive - // discovery attempts. - - if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) { - // FLINK-4341 workaround case (a) - please see the above for details on this case - LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...", - indexOfThisConsumerSubtask); - sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); - emittedMaxValueWatermark = true; - } else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) { - // FLINK-4341 workaround case (b) - please see the above for details on this case - // - // Note that in the case where on resharding this subtask ceased to read all of it's previous shards - // but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark - // will be false; this allows the fetcher to continue reading the new shards without failing on such cases. - // However, due to the race condition mentioned above, we might still fall into case (a) first, and - // then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value - // watermark emitting still remains to be correct. - - LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" + - " up watermarks; the new shards will be subscribed by this subtask after restore ...", - indexOfThisConsumerSubtask, newShardsDueToResharding.size()); - throw new RuntimeException("Deliberate failure to avoid messing up watermarks"); - } - - for (KinesisStreamShard shard : newShardsDueToResharding) { - // since there may be delay in discovering a new shard, all new shards due to - // resharding should be read starting from the earliest record possible - KinesisStreamShardState newShardState = - new KinesisStreamShardState(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()); - int newStateIndex = registerNewSubscribedShardState(newShardState); - - if (LOG.isInfoEnabled()) { - LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming " + - "the shard from sequence number {} with ShardConsumer {}", - indexOfThisConsumerSubtask, newShardState.getKinesisStreamShard().toString(), - newShardState.getLastProcessedSequenceNum(), newStateIndex); - } - - shardConsumersExecutor.submit( - new ShardConsumer<>( - this, - newStateIndex, - newShardState.getKinesisStreamShard(), - newShardState.getLastProcessedSequenceNum())); - } - - // we also check if we are running here so that we won't start the discovery sleep - // interval if the running flag was set to false during the middle of the while loop - if (running && discoveryIntervalMillis != 0) { - try { - Thread.sleep(discoveryIntervalMillis); - } catch (InterruptedException iex) { - // the sleep may be interrupted by shutdownFetcher() - } - } - } - - // make sure all resources have been terminated before leaving - awaitTermination(); - - // any error thrown in the shard consumer threads will be thrown to the main thread - Throwable throwable = this.error.get(); - if (throwable != null) { - if (throwable instanceof Exception) { - throw (Exception) throwable; - } else if (throwable instanceof Error) { - throw (Error) throwable; - } else { - throw new Exception(throwable); - } - } - } - - /** - * Creates a snapshot of the current last processed sequence numbers of each subscribed shard. - * - * @return state snapshot - */ - public HashMap<KinesisStreamShard, SequenceNumber> snapshotState() { - // this method assumes that the checkpoint lock is held - assert Thread.holdsLock(checkpointLock); - - HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new HashMap<>(); - for (KinesisStreamShardState shardWithState : subscribedShardsState) { - stateSnapshot.put(shardWithState.getKinesisStreamShard(), shardWithState.getLastProcessedSequenceNum()); - } - return stateSnapshot; - } - - /** - * Starts shutting down the fetcher. Must be called to allow {@link KinesisDataFetcher#runFetcher()} to complete. - * Once called, the shutdown procedure will be executed and all shard consuming threads will be interrupted. - */ - public void shutdownFetcher() { - running = false; - mainThread.interrupt(); // the main thread may be sleeping for the discovery interval - - if (LOG.isInfoEnabled()) { - LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); - } - shardConsumersExecutor.shutdownNow(); - } - - /** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown */ - public void awaitTermination() throws InterruptedException { - while(!shardConsumersExecutor.isTerminated()) { - Thread.sleep(50); - } - } - - /** Called by created threads to pass on errors. Only the first thrown error is set. - * Once set, the shutdown process will be executed and all shard consuming threads will be interrupted. */ - protected void stopWithError(Throwable throwable) { - if (this.error.compareAndSet(null, throwable)) { - shutdownFetcher(); - } - } - - // ------------------------------------------------------------------------ - // Functions that update the subscribedStreamToLastDiscoveredShardIds state - // ------------------------------------------------------------------------ - - /** Updates the last discovered shard of a subscribed stream; only updates if the update is valid */ - public void advanceLastDiscoveredShardOfStream(String stream, String shardId) { - String lastSeenShardIdOfStream = this.subscribedStreamsToLastDiscoveredShardIds.get(stream); - - // the update is valid only if the given shard id is greater - // than the previous last seen shard id of the stream - if (lastSeenShardIdOfStream == null) { - // if not previously set, simply put as the last seen shard id - this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId); - } else if (KinesisStreamShard.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) { - this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId); - } - } - - /** - * A utility function that does the following: - * - * 1. Find new shards for each stream that we haven't seen before - * 2. For each new shard, determine whether this consumer subtask should subscribe to them; - * if yes, it is added to the returned list of shards - * 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards - * that we have already seen before the next time this function is called - */ - private List<KinesisStreamShard> discoverNewShardsToSubscribe() throws InterruptedException { - - List<KinesisStreamShard> newShardsToSubscribe = new LinkedList<>(); - - GetShardListResult shardListResult = kinesis.getShardList(subscribedStreamsToLastDiscoveredShardIds); - if (shardListResult.hasRetrievedShards()) { - Set<String> streamsWithNewShards = shardListResult.getStreamsWithRetrievedShards(); - - for (String stream : streamsWithNewShards) { - List<KinesisStreamShard> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream); - for (KinesisStreamShard newShard : newShardsOfStream) { - if (isThisSubtaskShouldSubscribeTo(newShard, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) { - newShardsToSubscribe.add(newShard); - } - } - - advanceLastDiscoveredShardOfStream( - stream, shardListResult.getLastSeenShardOfStream(stream).getShard().getShardId()); - } - } - - return newShardsToSubscribe; - } - - // ------------------------------------------------------------------------ - // Functions to get / set information about the consumer - // ------------------------------------------------------------------------ - - public void setIsRestoringFromFailure(boolean bool) { - this.isRestoredFromFailure = bool; - } - - protected Properties getConsumerConfiguration() { - return configProps; - } - - protected KinesisDeserializationSchema<T> getClonedDeserializationSchema() { - try { - return InstantiationUtil.clone(deserializationSchema, runtimeContext.getUserCodeClassLoader()); - } catch (IOException | ClassNotFoundException ex) { - // this really shouldn't happen; simply wrap it around a runtime exception - throw new RuntimeException(ex); - } - } - - // ------------------------------------------------------------------------ - // Thread-safe operations for record emitting and shard state updating - // that assure atomicity with respect to the checkpoint lock - // ------------------------------------------------------------------------ - - /** - * Atomic operation to collect a record and update state to the sequence number of the record. - * This method is called by {@link ShardConsumer}s. - * - * @param record the record to collect - * @param recordTimestamp timestamp to attach to the collected record - * @param shardStateIndex index of the shard to update in subscribedShardsState; - * this index should be the returned value from - * {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called - * when the shard state was registered. - * @param lastSequenceNumber the last sequence number value to update - */ - protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { - synchronized (checkpointLock) { - sourceContext.collectWithTimestamp(record, recordTimestamp); - updateState(shardStateIndex, lastSequenceNumber); - } - } - - /** - * Update the shard to last processed sequence number state. - * This method is called by {@link ShardConsumer}s. - * - * @param shardStateIndex index of the shard to update in subscribedShardsState; - * this index should be the returned value from - * {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called - * when the shard state was registered. - * @param lastSequenceNumber the last sequence number value to update - */ - protected void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) { - synchronized (checkpointLock) { - subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber); - - // if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread, - // we've finished reading the shard and should determine it to be non-active - if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { - this.numberOfActiveShards.decrementAndGet(); - LOG.info("Subtask {} has reached the end of subscribed shard: {}", - indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard()); - } - } - } - - /** - * Register a new subscribed shard state. - * - * @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to - */ - public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) { - synchronized (checkpointLock) { - subscribedShardsState.add(newSubscribedShardState); - - // If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case - // if the consumer had already finished reading a shard before we failed and restored), we determine that - // this subtask has a new active shard - if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { - this.numberOfActiveShards.incrementAndGet(); - } - - return subscribedShardsState.size()-1; - } - } - - // ------------------------------------------------------------------------ - // Miscellaneous utility functions - // ------------------------------------------------------------------------ - - /** - * Utility function to determine whether a shard should be subscribed by this consumer subtask. - * - * @param shard the shard to determine - * @param totalNumberOfConsumerSubtasks total number of consumer subtasks - * @param indexOfThisConsumerSubtask index of this consumer subtask - */ - private static boolean isThisSubtaskShouldSubscribeTo(KinesisStreamShard shard, - int totalNumberOfConsumerSubtasks, - int indexOfThisConsumerSubtask) { - return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask; - } - - private static ExecutorService createShardConsumersThreadPool(final String subtaskName) { - return Executors.newCachedThreadPool(new ThreadFactory() { - @Override - public Thread newThread(Runnable runnable) { - final AtomicLong threadCount = new AtomicLong(0); - Thread thread = new Thread(runnable); - thread.setName("shardConsumers-" + subtaskName + "-thread-" + threadCount.getAndIncrement()); - thread.setDaemon(true); - return thread; - } - }); - } - - /** - * Utility function to create an initial map of the last discovered shard id of each subscribed stream, set to null; - * This is called in the constructor; correct values will be set later on by calling advanceLastDiscoveredShardOfStream() - * - * @param streams the list of subscribed streams - * @return the initial map for subscribedStreamsToLastDiscoveredShardIds - */ - protected static HashMap<String, String> createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams) { - HashMap<String, String> initial = new HashMap<>(); - for (String stream : streams) { - initial.put(stream, null); - } - return initial; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java deleted file mode 100644 index 612a4a7..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.internals; - -import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; -import com.amazonaws.services.kinesis.model.ExpiredIteratorException; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.ShardIteratorType; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; -import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; -import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; -import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; -import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; -import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; -import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Properties; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Thread that does the actual data pulling from AWS Kinesis shards. Each thread is in charge of one Kinesis shard only. - */ -public class ShardConsumer<T> implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class); - - private final KinesisDeserializationSchema<T> deserializer; - - private final KinesisProxyInterface kinesis; - - private final int subscribedShardStateIndex; - - private final KinesisDataFetcher<T> fetcherRef; - - private final KinesisStreamShard subscribedShard; - - private final int maxNumberOfRecordsPerFetch; - private final long fetchIntervalMillis; - - private SequenceNumber lastSequenceNum; - - /** - * Creates a shard consumer. - * - * @param fetcherRef reference to the owning fetcher - * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to - * @param subscribedShard the shard this consumer is subscribed to - * @param lastSequenceNum the sequence number in the shard to start consuming - */ - public ShardConsumer(KinesisDataFetcher<T> fetcherRef, - Integer subscribedShardStateIndex, - KinesisStreamShard subscribedShard, - SequenceNumber lastSequenceNum) { - this(fetcherRef, - subscribedShardStateIndex, - subscribedShard, - lastSequenceNum, - KinesisProxy.create(fetcherRef.getConsumerConfiguration())); - } - - /** This constructor is exposed for testing purposes */ - protected ShardConsumer(KinesisDataFetcher<T> fetcherRef, - Integer subscribedShardStateIndex, - KinesisStreamShard subscribedShard, - SequenceNumber lastSequenceNum, - KinesisProxyInterface kinesis) { - this.fetcherRef = checkNotNull(fetcherRef); - this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex); - this.subscribedShard = checkNotNull(subscribedShard); - this.lastSequenceNum = checkNotNull(lastSequenceNum); - checkArgument( - !lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()), - "Should not start a ShardConsumer if the shard has already been completely read."); - - this.deserializer = fetcherRef.getClonedDeserializationSchema(); - - Properties consumerConfig = fetcherRef.getConsumerConfiguration(); - this.kinesis = kinesis; - this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfig.getProperty( - ConsumerConfigConstants.SHARD_GETRECORDS_MAX, - Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX))); - this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty( - ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, - Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); - } - - @SuppressWarnings("unchecked") - @Override - public void run() { - String nextShardItr; - - try { - // before infinitely looping, we set the initial nextShardItr appropriately - - if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) { - // if the shard is already closed, there will be no latest next record to get for this shard - if (subscribedShard.isClosed()) { - nextShardItr = null; - } else { - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); - } - } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) { - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); - } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { - nextShardItr = null; - } else { - // we will be starting from an actual sequence number (due to restore from failure). - // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records - // from the last aggregated record; otherwise, we can simply start iterating from the record right after. - - if (lastSequenceNum.isAggregated()) { - String itrForLastAggregatedRecord = - kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); - - // get only the last aggregated record - GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1); - - List<UserRecord> fetchedRecords = deaggregateRecords( - getRecordsResult.getRecords(), - subscribedShard.getShard().getHashKeyRange().getStartingHashKey(), - subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); - - long lastSubSequenceNum = lastSequenceNum.getSubSequenceNumber(); - for (UserRecord record : fetchedRecords) { - // we have found a dangling sub-record if it has a larger subsequence number - // than our last sequence number; if so, collect the record and update state - if (record.getSubSequenceNumber() > lastSubSequenceNum) { - deserializeRecordForCollectionAndUpdateState(record); - } - } - - // set the nextShardItr so we can continue iterating in the next while loop - nextShardItr = getRecordsResult.getNextShardIterator(); - } else { - // the last record was non-aggregated, so we can simply start from the next record - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); - } - } - - while(isRunning()) { - if (nextShardItr == null) { - fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()); - - // we can close this consumer thread once we've reached the end of the subscribed shard - break; - } else { - if (fetchIntervalMillis != 0) { - Thread.sleep(fetchIntervalMillis); - } - - GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch); - - // each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding - List<UserRecord> fetchedRecords = deaggregateRecords( - getRecordsResult.getRecords(), - subscribedShard.getShard().getHashKeyRange().getStartingHashKey(), - subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); - - for (UserRecord record : fetchedRecords) { - deserializeRecordForCollectionAndUpdateState(record); - } - - nextShardItr = getRecordsResult.getNextShardIterator(); - } - } - } catch (Throwable t) { - fetcherRef.stopWithError(t); - } - } - - /** - * The loop in run() checks this before fetching next batch of records. Since this runnable will be executed - * by the ExecutorService {@link KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread - * would be by calling shutdownNow() on {@link KinesisDataFetcher#shardConsumersExecutor} and let the executor service - * interrupt all currently running {@link ShardConsumer}s. - */ - private boolean isRunning() { - return !Thread.interrupted(); - } - - /** - * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last - * successfully collected sequence number in this shard consumer is also updated so that - * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard - * iterators if necessary. - * - * Note that the server-side Kinesis timestamp is attached to the record when collected. When the - * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default. - * - * @param record record to deserialize and collect - * @throws IOException - */ - private void deserializeRecordForCollectionAndUpdateState(UserRecord record) - throws IOException { - ByteBuffer recordData = record.getData(); - - byte[] dataBytes = new byte[recordData.remaining()]; - recordData.get(dataBytes); - - final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime(); - - final T value = deserializer.deserialize( - dataBytes, - record.getPartitionKey(), - record.getSequenceNumber(), - approxArrivalTimestamp, - subscribedShard.getStreamName(), - subscribedShard.getShard().getShardId()); - - SequenceNumber collectedSequenceNumber = (record.isAggregated()) - ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()) - : new SequenceNumber(record.getSequenceNumber()); - - fetcherRef.emitRecordAndUpdateState( - value, - approxArrivalTimestamp, - subscribedShardStateIndex, - collectedSequenceNumber); - - lastSequenceNum = collectedSequenceNumber; - } - - /** - * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected - * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on - * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should - * be used for the next call to this method. - * - * Note: it is important that this method is not called again before all the records from the last result have been - * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise - * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to - * incorrect shard iteration if the iterator had to be refreshed. - * - * @param shardItr shard iterator to use - * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt - * @return get records result - * @throws InterruptedException - */ - private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException { - GetRecordsResult getRecordsResult = null; - while (getRecordsResult == null) { - try { - getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); - } catch (ExpiredIteratorException eiEx) { - LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + - " refreshing the iterator ...", shardItr, subscribedShard); - shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); - - // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator - if (fetchIntervalMillis != 0) { - Thread.sleep(fetchIntervalMillis); - } - } - } - return getRecordsResult; - } - - @SuppressWarnings("unchecked") - protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) { - return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java deleted file mode 100644 index 53ed11b..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.model; - -import com.amazonaws.services.kinesis.model.Shard; - -import java.io.Serializable; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information - * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to - * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges. - */ -public class KinesisStreamShard implements Serializable { - - private static final long serialVersionUID = -6004217801761077536L; - - private final String streamName; - private final Shard shard; - - private final int cachedHash; - - /** - * Create a new KinesisStreamShard - * - * @param streamName - * the name of the Kinesis stream that this shard belongs to - * @param shard - * the actual AWS Shard instance that will be wrapped within this KinesisStreamShard - */ - public KinesisStreamShard(String streamName, Shard shard) { - this.streamName = checkNotNull(streamName); - this.shard = checkNotNull(shard); - - // since our description of Kinesis Streams shards can be fully defined with the stream name and shard id, - // our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation - int hash = 17; - hash = 37 * hash + streamName.hashCode(); - hash = 37 * hash + shard.getShardId().hashCode(); - this.cachedHash = hash; - } - - public String getStreamName() { - return streamName; - } - - public boolean isClosed() { - return (shard.getSequenceNumberRange().getEndingSequenceNumber() != null); - } - - public Shard getShard() { - return shard; - } - - @Override - public String toString() { - return "KinesisStreamShard{" + - "streamName='" + streamName + "'" + - ", shard='" + shard.toString() + "'}"; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof KinesisStreamShard)) { - return false; - } - - if (obj == this) { - return true; - } - - KinesisStreamShard other = (KinesisStreamShard) obj; - - return streamName.equals(other.getStreamName()) && shard.equals(other.getShard()); - } - - @Override - public int hashCode() { - return cachedHash; - } - - /** - * Utility function to compare two shard ids - * - * @param firstShardId first shard id to compare - * @param secondShardId second shard id to compare - * @return a value less than 0 if the first shard id is smaller than the second shard id, - * or a value larger than 0 the first shard is larger then the second shard id, - * or 0 if they are equal - */ - public static int compareShardIds(String firstShardId, String secondShardId) { - if (!isValidShardId(firstShardId)) { - throw new IllegalArgumentException("The first shard id has invalid format."); - } - - if (!isValidShardId(secondShardId)) { - throw new IllegalArgumentException("The second shard id has invalid format."); - } - - // digit segment of the shard id starts at index 8 - return Long.compare(Long.parseLong(firstShardId.substring(8)), Long.parseLong(secondShardId.substring(8))); - } - - /** - * Checks if a shard id has valid format. - * Kinesis stream shard ids have 12-digit numbers left-padded with 0's, - * prefixed with "shardId-", ex. "shardId-000000000015". - * - * @param shardId the shard id to check - * @return whether the shard id is valid - */ - public static boolean isValidShardId(String shardId) { - if (shardId == null) { return false; } - return shardId.matches("^shardId-\\d{12}"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java deleted file mode 100644 index 00181da..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.model; - -/** - * A wrapper class that bundles a {@link KinesisStreamShard} with its last processed sequence number. - */ -public class KinesisStreamShardState { - - private KinesisStreamShard kinesisStreamShard; - private SequenceNumber lastProcessedSequenceNum; - - public KinesisStreamShardState(KinesisStreamShard kinesisStreamShard, SequenceNumber lastProcessedSequenceNum) { - this.kinesisStreamShard = kinesisStreamShard; - this.lastProcessedSequenceNum = lastProcessedSequenceNum; - } - - public KinesisStreamShard getKinesisStreamShard() { - return this.kinesisStreamShard; - } - - public SequenceNumber getLastProcessedSequenceNum() { - return this.lastProcessedSequenceNum; - } - - public void setLastProcessedSequenceNum(SequenceNumber update) { - this.lastProcessedSequenceNum = update; - } - - @Override - public String toString() { - return "KinesisStreamShardState{" + - "kinesisStreamShard='" + kinesisStreamShard.toString() + "'" + - ", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}"; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof KinesisStreamShardState)) { - return false; - } - - if (obj == this) { - return true; - } - - KinesisStreamShardState other = (KinesisStreamShardState) obj; - - return kinesisStreamShard.equals(other.getKinesisStreamShard()) && lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum()); - } - - @Override - public int hashCode() { - return 37 * (kinesisStreamShard.hashCode() + lastProcessedSequenceNum.hashCode()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java deleted file mode 100644 index 8182201..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.model; - -import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; -import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; - -/** - * Special flag values for sequence numbers in shards to indicate special positions. - * The value is initially set by {@link FlinkKinesisConsumer} when {@link KinesisDataFetcher}s are created. - * The KinesisDataFetchers will use this value to determine how to retrieve the starting shard iterator from AWS Kinesis. - */ -public enum SentinelSequenceNumber { - - /** Flag value for shard's sequence numbers to indicate that the - * shard should start to be read from the latest incoming records */ - SENTINEL_LATEST_SEQUENCE_NUM( new SequenceNumber("LATEST_SEQUENCE_NUM") ), - - /** Flag value for shard's sequence numbers to indicate that the shard should - * start to be read from the earliest records that haven't expired yet */ - SENTINEL_EARLIEST_SEQUENCE_NUM( new SequenceNumber("EARLIEST_SEQUENCE_NUM") ), - - /** Flag value to indicate that we have already read the last record of this shard - * (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record) */ - SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") ); - - private SequenceNumber sentinel; - - SentinelSequenceNumber(SequenceNumber sentinel) { - this.sentinel = sentinel; - } - - public SequenceNumber get() { - return sentinel; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java deleted file mode 100644 index 021f53f..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.model; - -import java.io.Serializable; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A serializable representation of a Kinesis record's sequence number. It has two fields: the main sequence number, - * and also a subsequence number. If this {@link SequenceNumber} is referring to an aggregated Kinesis record, the - * subsequence number will be a non-negative value representing the order of the sub-record within the aggregation. - */ -public class SequenceNumber implements Serializable { - - private static final long serialVersionUID = 876972197938972667L; - - private static final String DELIMITER = "-"; - - private final String sequenceNumber; - private final long subSequenceNumber; - - private final int cachedHash; - - /** - * Create a new instance for a non-aggregated Kinesis record without a subsequence number. - * @param sequenceNumber the sequence number - */ - public SequenceNumber(String sequenceNumber) { - this(sequenceNumber, -1); - } - - /** - * Create a new instance, with the specified sequence number and subsequence number. - * To represent the sequence number for a non-aggregated Kinesis record, the subsequence number should be -1. - * Otherwise, give a non-negative sequence number to represent an aggregated Kinesis record. - * - * @param sequenceNumber the sequence number - * @param subSequenceNumber the subsequence number (-1 to represent non-aggregated Kinesis records) - */ - public SequenceNumber(String sequenceNumber, long subSequenceNumber) { - this.sequenceNumber = checkNotNull(sequenceNumber); - this.subSequenceNumber = subSequenceNumber; - - this.cachedHash = 37 * (sequenceNumber.hashCode() + Long.valueOf(subSequenceNumber).hashCode()); - } - - public boolean isAggregated() { - return subSequenceNumber >= 0; - } - - public String getSequenceNumber() { - return sequenceNumber; - } - - public long getSubSequenceNumber() { - return subSequenceNumber; - } - - @Override - public String toString() { - if (isAggregated()) { - return sequenceNumber + DELIMITER + subSequenceNumber; - } else { - return sequenceNumber; - } - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof SequenceNumber)) { - return false; - } - - if (obj == this) { - return true; - } - - SequenceNumber other = (SequenceNumber) obj; - - return sequenceNumber.equals(other.getSequenceNumber()) - && (subSequenceNumber == other.getSubSequenceNumber()); - } - - @Override - public int hashCode() { - return cachedHash; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java deleted file mode 100644 index 04b1654..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.proxy; - -import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; - -import java.util.LinkedList; -import java.util.List; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -/** - * Basic model class to bundle the shards retrieved from Kinesis on a {@link KinesisProxyInterface#getShardList(Map)} call. - */ -public class GetShardListResult { - - private final Map<String, LinkedList<KinesisStreamShard>> streamsToRetrievedShardList = new HashMap<>(); - - public void addRetrievedShardToStream(String stream, KinesisStreamShard retrievedShard) { - if (!streamsToRetrievedShardList.containsKey(stream)) { - streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>()); - } - streamsToRetrievedShardList.get(stream).add(retrievedShard); - } - - public void addRetrievedShardsToStream(String stream, List<KinesisStreamShard> retrievedShards) { - if (retrievedShards.size() != 0) { - if (!streamsToRetrievedShardList.containsKey(stream)) { - streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>()); - } - streamsToRetrievedShardList.get(stream).addAll(retrievedShards); - } - } - - public List<KinesisStreamShard> getRetrievedShardListOfStream(String stream) { - if (!streamsToRetrievedShardList.containsKey(stream)) { - return null; - } else { - return streamsToRetrievedShardList.get(stream); - } - } - - public KinesisStreamShard getLastSeenShardOfStream(String stream) { - if (!streamsToRetrievedShardList.containsKey(stream)) { - return null; - } else { - return streamsToRetrievedShardList.get(stream).getLast(); - } - } - - public boolean hasRetrievedShards() { - return !streamsToRetrievedShardList.isEmpty(); - } - - public Set<String> getStreamsWithRetrievedShards() { - return streamsToRetrievedShardList.keySet(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java deleted file mode 100644 index 9ffc8e6..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.proxy; - -import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.kinesis.model.DescribeStreamRequest; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; -import com.amazonaws.services.kinesis.model.GetRecordsRequest; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.GetShardIteratorResult; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.kinesis.model.LimitExceededException; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; -import com.amazonaws.services.kinesis.model.StreamStatus; -import com.amazonaws.services.kinesis.model.Shard; -import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; -import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; -import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.Map; -import java.util.Random; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Kinesis proxy implementation - a utility class that is used as a proxy to make - * calls to AWS Kinesis for several functions, such as getting a list of shards and - * fetching a batch of data records starting from a specified record sequence number. - * - * NOTE: - * In the AWS KCL library, there is a similar implementation - {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}. - * This implementation differs mainly in that we can make operations to arbitrary Kinesis streams, which is a needed - * functionality for the Flink Kinesis Connecter since the consumer may simultaneously read from multiple Kinesis streams. - */ -public class KinesisProxy implements KinesisProxyInterface { - - private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); - - /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ - private final AmazonKinesisClient kinesisClient; - - /** Random seed used to calculate backoff jitter for Kinesis operations */ - private final static Random seed = new Random(); - - // ------------------------------------------------------------------------ - // describeStream() related performance settings - // ------------------------------------------------------------------------ - - /** Base backoff millis for the describe stream operation */ - private final long describeStreamBaseBackoffMillis; - - /** Maximum backoff millis for the describe stream operation */ - private final long describeStreamMaxBackoffMillis; - - /** Exponential backoff power constant for the describe stream operation */ - private final double describeStreamExpConstant; - - // ------------------------------------------------------------------------ - // getRecords() related performance settings - // ------------------------------------------------------------------------ - - /** Base backoff millis for the get records operation */ - private final long getRecordsBaseBackoffMillis; - - /** Maximum backoff millis for the get records operation */ - private final long getRecordsMaxBackoffMillis; - - /** Exponential backoff power constant for the get records operation */ - private final double getRecordsExpConstant; - - /** Maximum attempts for the get records operation */ - private final int getRecordsMaxAttempts; - - // ------------------------------------------------------------------------ - // getShardIterator() related performance settings - // ------------------------------------------------------------------------ - - /** Base backoff millis for the get shard iterator operation */ - private final long getShardIteratorBaseBackoffMillis; - - /** Maximum backoff millis for the get shard iterator operation */ - private final long getShardIteratorMaxBackoffMillis; - - /** Exponential backoff power constant for the get shard iterator operation */ - private final double getShardIteratorExpConstant; - - /** Maximum attempts for the get shard iterator operation */ - private final int getShardIteratorMaxAttempts; - - /** - * Create a new KinesisProxy based on the supplied configuration properties - * - * @param configProps configuration properties containing AWS credential and AWS region info - */ - private KinesisProxy(Properties configProps) { - checkNotNull(configProps); - - this.kinesisClient = AWSUtil.createKinesisClient(configProps); - - this.describeStreamBaseBackoffMillis = Long.valueOf( - configProps.getProperty( - ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, - Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE))); - this.describeStreamMaxBackoffMillis = Long.valueOf( - configProps.getProperty( - ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, - Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX))); - this.describeStreamExpConstant = Double.valueOf( - configProps.getProperty( - ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, - Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT))); - - this.getRecordsBaseBackoffMillis = Long.valueOf( - configProps.getProperty( - ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, - Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE))); - this.getRecordsMaxBackoffMillis = Long.valueOf( - configProps.getProperty( - ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, - Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX))); - this.getRecordsExpConstant = Double.valueOf( - configProps.getProperty( - ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, - Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT))); - this.getRecordsMaxAttempts = Integer.valueOf( - configProps.getProperty( - ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, - Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES))); - - this.getShardIteratorBaseBackoffMillis = Long.valueOf( - configProps.getProperty( - ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, - Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE))); - this.getShardIteratorMaxBackoffMillis = Long.valueOf( - configProps.getProperty( - ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, - Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX))); - this.getShardIteratorExpConstant = Double.valueOf( - configProps.getProperty( - ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, - Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT))); - this.getShardIteratorMaxAttempts = Integer.valueOf( - configProps.getProperty( - ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, - Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES))); - - } - - /** - * Creates a Kinesis proxy. - * - * @param configProps configuration properties - * @return the created kinesis proxy - */ - public static KinesisProxyInterface create(Properties configProps) { - return new KinesisProxy(configProps); - } - - /** - * {@inheritDoc} - */ - @Override - public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException { - final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); - getRecordsRequest.setShardIterator(shardIterator); - getRecordsRequest.setLimit(maxRecordsToGet); - - GetRecordsResult getRecordsResult = null; - - int attempt = 0; - while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { - try { - getRecordsResult = kinesisClient.getRecords(getRecordsRequest); - } catch (ProvisionedThroughputExceededException ex) { - long backoffMillis = fullJitterBackoff( - getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); - LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " - + backoffMillis + " millis."); - Thread.sleep(backoffMillis); - } - } - - if (getRecordsResult == null) { - throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + - " retry attempts returned ProvisionedThroughputExceededException."); - } - - return getRecordsResult; - } - - /** - * {@inheritDoc} - */ - @Override - public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException { - GetShardListResult result = new GetShardListResult(); - - for (Map.Entry<String,String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) { - String stream = streamNameWithLastSeenShardId.getKey(); - String lastSeenShardId = streamNameWithLastSeenShardId.getValue(); - result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId)); - } - return result; - } - - /** - * {@inheritDoc} - */ - @Override - public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { - GetShardIteratorResult getShardIteratorResult = null; - - int attempt = 0; - while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) { - try { - getShardIteratorResult = - kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum); - } catch (ProvisionedThroughputExceededException ex) { - long backoffMillis = fullJitterBackoff( - getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++); - LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " - + backoffMillis + " millis."); - Thread.sleep(backoffMillis); - } - } - - if (getShardIteratorResult == null) { - throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + - " retry attempts returned ProvisionedThroughputExceededException."); - } - return getShardIteratorResult.getShardIterator(); - } - - private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { - List<KinesisStreamShard> shardsOfStream = new ArrayList<>(); - - DescribeStreamResult describeStreamResult; - do { - describeStreamResult = describeStream(streamName, lastSeenShardId); - - List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); - for (Shard shard : shards) { - shardsOfStream.add(new KinesisStreamShard(streamName, shard)); - } - - if (shards.size() != 0) { - lastSeenShardId = shards.get(shards.size() - 1).getShardId(); - } - } while (describeStreamResult.getStreamDescription().isHasMoreShards()); - - return shardsOfStream; - } - - /** - * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess. - * - * This method is using a "full jitter" approach described in AWS's article, - * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>. - * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This - * jitter backoff approach will help distribute calls across the fetchers over time. - * - * @param streamName the stream to describe - * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) - * @return the result of the describe stream operation - */ - private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { - final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); - describeStreamRequest.setExclusiveStartShardId(startShardId); - - DescribeStreamResult describeStreamResult = null; - - // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). - int attemptCount = 0; - while (describeStreamResult == null) { // retry until we get a result - try { - describeStreamResult = kinesisClient.describeStream(describeStreamRequest); - } catch (LimitExceededException le) { - long backoffMillis = fullJitterBackoff( - describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); - LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + backoffMillis + " millis."); - Thread.sleep(backoffMillis); - } catch (ResourceNotFoundException re) { - throw new RuntimeException("Error while getting stream details", re); - } - } - - String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); - if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) { - if (LOG.isWarnEnabled()) { - LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + - "describeStream operation will not contain any shard information."); - } - } - - // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive - // start shard id in the returned shards list; check if we need to remove these erroneously returned shards - if (startShardId != null) { - List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); - Iterator<Shard> shardItr = shards.iterator(); - while (shardItr.hasNext()) { - if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) { - shardItr.remove(); - } - } - } - - return describeStreamResult; - } - - private static long fullJitterBackoff(long base, long max, double power, int attempt) { - long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt)); - return (long)(seed.nextDouble()*exponentialBackoff); // random jitter between 0 and the exponential backoff - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java deleted file mode 100644 index 39ddc52..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.proxy; - -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; - -import java.util.Map; - -/** - * Interface for a Kinesis proxy that operates on multiple Kinesis streams within the same AWS service region. - */ -public interface KinesisProxyInterface { - - /** - * Get a shard iterator from the specified position in a shard. - * The retrieved shard iterator can be used in {@link KinesisProxyInterface#getRecords(String, int)}} - * to read data from the Kinesis shard. - * - * @param shard the shard to get the iterator - * @param shardIteratorType the iterator type, defining how the shard is to be iterated - * (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER) - * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST - * @return shard iterator which can be used to read data from Kinesis - * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the - * operation has exceeded the rate limit; this exception will be thrown - * if the backoff is interrupted. - */ - String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException; - - /** - * Get the next batch of data records using a specific shard iterator - * - * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading - * @param maxRecordsToGet the maximum amount of records to retrieve for this batch - * @return the batch of retrieved records, also with a shard iterator that can be used to get the next batch - * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the - * operation has exceeded the rate limit; this exception will be thrown - * if the backoff is interrupted. - */ - GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException; - - /** - * Get shard list of multiple Kinesis streams, ignoring the - * shards of each stream before a specified last seen shard id. - * - * @param streamNamesWithLastSeenShardIds a map with stream as key, and last seen shard id as value - * @return result of the shard list query - * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the - * operation has exceeded the rate limit; this exception will be thrown - * if the backoff is interrupted. - */ - GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException; -}
