Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2131#discussion_r68408986
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
    @@ -17,156 +17,481 @@
     
     package org.apache.flink.streaming.connectors.kinesis.internals;
     
    +import org.apache.flink.api.common.functions.RuntimeContext;
     import org.apache.flink.streaming.api.functions.source.SourceFunction;
     import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
    -import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
    +import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
     import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
     import org.apache.flink.util.InstantiationUtil;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.io.IOException;
    -import java.util.Map;
    -import java.util.HashMap;
    +
    +import java.util.LinkedList;
     import java.util.List;
    -import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.Map;
     import java.util.Properties;
    +
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.atomic.AtomicLong;
     import java.util.concurrent.atomic.AtomicReference;
     
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
    - * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
    - * The fetcher spawns a single thread for connection to each shard.
    + * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
    + * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher runs several threads to accomplish
    + * the following:
    + * <ul>
    + *     <li>1. continously poll Kinesis to discover shards that the subtask 
should subscribe to. The subscribed subset
    + *                   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
    + *                   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
    + *                   to the same subset of shards even after 
restoring)</li>
    + *     <li>2. decide where in each discovered shard should the fetcher 
start subscribing to</li>
    + *     <li>3. subscribe to shards by creating a single thread for each 
shard</li>
    + * </ul>
    + *
    + * <p>The fetcher manages two states: 1) pending shards for subscription, 
and 2) last processed sequence numbers of
    + * each subscribed shard. All operations on the states in multiple threads 
should only be done using the handler methods
    + * provided in this class.
      */
    -public class KinesisDataFetcher {
    +public class KinesisDataFetcher<T> {
     
        private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
     
    -   /** Config properties for the Flink Kinesis Consumer */
    +   // 
------------------------------------------------------------------------
    +   //  Consumer-wide settings
    +   // 
------------------------------------------------------------------------
    +
    +   /** Configuration properties for the Flink Kinesis Consumer */
        private final Properties configProps;
     
    -   /** The name of the consumer task that this fetcher was instantiated */
    -   private final String taskName;
    +   /** The list of Kinesis streams that the consumer is subscribing to */
    +   private final List<String> streams;
    +
    +   /**
    +    * The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
    +    * Note that since this might not be thread-safe, multiple threads in 
the fetcher using this must
    +    * clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
    +    */
    +   private final KinesisDeserializationSchema<T> deserializationSchema;
    +
    +   // 
------------------------------------------------------------------------
    +   //  Subtask-specific settings
    +   // 
------------------------------------------------------------------------
    +
    +   /** Runtime context of the subtask that this fetcher was created in */
    +   private final RuntimeContext runtimeContext;
    +
    +   // 
------------------------------------------------------------------------
    +   //  Executor services to run created threads
    +   // 
------------------------------------------------------------------------
    +
    +   /** Executor service to run the {@link ShardDiscoverer} and {@link 
ShardSubscriber} */
    +   private final ExecutorService shardDiscovererAndSubscriberExecutor;
    +
    +   /** Executor service to run {@link ShardConsumer}s to consumer Kinesis 
shards */
    +   private final ExecutorService shardConsumersExecutor;
     
    -   /** Information of the shards that this fetcher handles, along with the 
sequence numbers that they should start from */
    -   private HashMap<KinesisStreamShard, String> 
assignedShardsWithStartingSequenceNum;
    +   // 
------------------------------------------------------------------------
    +   //  Managed state, accessed and updated across multiple threads
    +   // 
------------------------------------------------------------------------
     
    -   /** Reference to the thread that executed run() */
    -   private volatile Thread mainThread;
    +   /**
    +    * Blocking queue for newly discovered shards, with their states, that 
this fetcher should consume.
    +    * The {@link ShardDiscoverer} will add shards with initial position as 
state to this queue as shards are discovered,
    +    * while the {@link ShardSubscriber} polls this queue to start 
subscribing to the new discovered shards.
    +    */
    +   private final BlockingQueue<KinesisStreamShardState> pendingShards;
    +
    +   /**
    +    * The shards, along with their last processed sequence numbers, that 
this fetcher is subscribed to. The shard
    +    * subscriber will add to this list as it polls pending shards. Shard 
consumer threads update the last processed
    +    * sequence number of subscribed shards as they fetch and process 
records.
    +    *
    +    * <p>Note that since multiple threads will be performing operations on 
this list, all operations must be wrapped in
    +    * synchronized blocks on the {@link KinesisDataFetcher#checkpointLock} 
lock. For this purpose, all threads must use
    +    * the following thread-safe methods this class provides to operate on 
this list:
    +    * <ul>
    +    *     <li>{@link 
KinesisDataFetcher#addAndStartConsumingNewSubscribedShard(KinesisStreamShardState)}</li>
    +    *     <li>{@link KinesisDataFetcher#updateState(int, String)}</li>
    +    *     <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(Object, 
int, String)}</li>
    +    * </ul>
    +    */
    +   private final List<KinesisStreamShardState> subscribedShardsState;
    +
    +   private final SourceFunction.SourceContext<T> sourceContext;
    +
    +   /** Checkpoint lock, also used to synchronize operations on 
subscribedShardsState */
    +   private final Object checkpointLock;
     
    -   /** Reference to the first error thrown by any of the spawned shard 
connection threads */
    +   /** This flag is set to true if the fetcher is provided a non-null and 
non-empty restored state */
    +   private final boolean isRestoredFromCheckpoint;
    +
    +   /** Reference to the first error thrown by any of the created threads */
        private final AtomicReference<Throwable> error;
     
        private volatile boolean running = true;
     
        /**
    -    * Creates a new Kinesis Data Fetcher for the specified set of shards
    +    * Creates a Kinesis Data Fetcher.
         *
    -    * @param assignedShards the shards that this fetcher will pull data 
from
    -    * @param configProps the configuration properties of this Flink 
Kinesis Consumer
    -    * @param taskName the task name of this consumer task
    +    * @param streams the streams to subscribe to
    +    * @param sourceContext context of the source function
    +    * @param runtimeContext this subtask's runtime context
    +    * @param configProps the consumer configuration properties
    +    * @param restoredState state of subcribed shards that the fetcher 
should restore to
    +    * @param deserializationSchema deserialization schema
         */
    -   public KinesisDataFetcher(List<KinesisStreamShard> assignedShards, 
Properties configProps, String taskName) {
    +   public KinesisDataFetcher(List<String> streams,
    +                                                   
SourceFunction.SourceContext<T> sourceContext,
    +                                                   RuntimeContext 
runtimeContext,
    +                                                   Properties configProps,
    +                                                   Map<KinesisStreamShard, 
String> restoredState,
    +                                                   
KinesisDeserializationSchema<T> deserializationSchema) {
    +           this(streams,
    +                   sourceContext,
    +                   sourceContext.getCheckpointLock(),
    +                   runtimeContext,configProps,
    +                   restoredState,
    +                   deserializationSchema,
    +                   new AtomicReference<Throwable>(),
    +                   new LinkedBlockingQueue<KinesisStreamShardState>(),
    +                   new LinkedList<KinesisStreamShardState>());
    +   }
    +
    +   /** This constructor is exposed for testing purposes */
    +   protected KinesisDataFetcher(List<String> streams,
    +                                                           
SourceFunction.SourceContext<T> sourceContext,
    +                                                           Object 
checkpointLock,
    +                                                           RuntimeContext 
runtimeContext,
    +                                                           Properties 
configProps,
    +                                                           
Map<KinesisStreamShard, String> restoredState,
    +                                                           
KinesisDeserializationSchema<T> deserializationSchema,
    +                                                           
AtomicReference<Throwable> error,
    +                                                           
LinkedBlockingQueue<KinesisStreamShardState> pendingShardsQueue,
    +                                                           
LinkedList<KinesisStreamShardState> subscribedShardsState) {
    +           this.streams = checkNotNull(streams);
                this.configProps = checkNotNull(configProps);
    -           this.assignedShardsWithStartingSequenceNum = new HashMap<>();
    -           for (KinesisStreamShard shard : assignedShards) {
    -                   assignedShardsWithStartingSequenceNum.put(shard, 
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
    +           this.sourceContext = checkNotNull(sourceContext);
    +           this.checkpointLock = checkNotNull(checkpointLock);
    +           this.runtimeContext = checkNotNull(runtimeContext);
    +           this.deserializationSchema = 
checkNotNull(deserializationSchema);
    +
    +           this.error = error;
    +           this.pendingShards = pendingShardsQueue;
    +           this.subscribedShardsState = subscribedShardsState;
    +
    +           this.shardDiscovererAndSubscriberExecutor =
    +                   
createShardDiscovererAndSubscriberThreadPool(runtimeContext.getTaskNameWithSubtasks());
    +           this.shardConsumersExecutor =
    +                   
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
    +
    +           this.isRestoredFromCheckpoint = (restoredState != null && 
restoredState.entrySet().size() != 0);
    +
    +           // if there is state to restore from last checkpoint, we seed 
them as initially discovered shards
    +           if (isRestoredFromCheckpoint) {
    +                   seedPendingShardsWithRestoredState(restoredState, 
this.pendingShards);
                }
    -           this.taskName = taskName;
    -           this.error = new AtomicReference<>();
        }
     
        /**
    -    * Advance a shard's starting sequence number to a specified value
    +    * Starts the fetcher. After starting the fetcher, it can only
    +    * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
         *
    -    * @param streamShard the shard to perform the advance on
    -    * @param sequenceNum the sequence number to advance to
    +    * @throws Exception the first error or exception thrown by the fetcher 
or any of the threads created by the fetcher.
         */
    -   public void advanceSequenceNumberTo(KinesisStreamShard streamShard, 
String sequenceNum) {
    -           if 
(!assignedShardsWithStartingSequenceNum.containsKey(streamShard)) {
    -                   throw new IllegalArgumentException("Can't advance 
sequence number on a shard we are not going to read.");
    +   public void runFetcher() throws Exception {
    +
    +           if (LOG.isInfoEnabled()) {
    +                   LOG.info("Subtask {} is starting the shard discoverer 
...", runtimeContext.getIndexOfThisSubtask());
    +           }
    +           shardDiscovererAndSubscriberExecutor.submit(new 
ShardDiscoverer<>(this));
    +
    +           // after this point we will start fetching data from Kinesis 
and update internal state,
    +           // so we check that we are running for the last time before 
continuing
    +           if (!running) {
    +                   return;
    +           }
    +
    +           if (LOG.isInfoEnabled()) {
    +                   LOG.info("Subtask {} is starting the shard subscriber 
...", runtimeContext.getIndexOfThisSubtask());
    +           }
    +           shardDiscovererAndSubscriberExecutor.submit(new 
ShardSubscriber<>(this));
    +
    +           while (running) {
    +                   // once either shutdownFetcher() or stopWithError()
    +                   // is called, we will escape this loop
    +           }
    --- End diff --
    
    Yup this is really bad ... will fix!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to