[
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348255#comment-15348255
]
ASF GitHub Bot commented on FLINK-3231:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2131#discussion_r68397478
--- Diff:
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
---
@@ -17,156 +17,481 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A Kinesis Data Fetcher that consumes data from a specific set of
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The
fetcher runs several threads to accomplish
+ * the following:
+ * <ul>
+ * <li>1. continously poll Kinesis to discover shards that the subtask
should subscribe to. The subscribed subset
+ * of shards, including future new shards, is
non-overlapping across subtasks (no two subtasks will be
+ * subscribed to the same shard) and determinate across
subtask restores (the subtask will always subscribe
+ * to the same subset of shards even after
restoring)</li>
+ * <li>2. decide where in each discovered shard should the fetcher
start subscribing to</li>
+ * <li>3. subscribe to shards by creating a single thread for each
shard</li>
+ * </ul>
+ *
+ * <p>The fetcher manages two states: 1) pending shards for subscription,
and 2) last processed sequence numbers of
+ * each subscribed shard. All operations on the states in multiple threads
should only be done using the handler methods
+ * provided in this class.
*/
-public class KinesisDataFetcher {
+public class KinesisDataFetcher<T> {
private static final Logger LOG =
LoggerFactory.getLogger(KinesisDataFetcher.class);
- /** Config properties for the Flink Kinesis Consumer */
+ //
------------------------------------------------------------------------
+ // Consumer-wide settings
+ //
------------------------------------------------------------------------
+
+ /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
- /** The name of the consumer task that this fetcher was instantiated */
- private final String taskName;
+ /** The list of Kinesis streams that the consumer is subscribing to */
+ private final List<String> streams;
+
+ /**
+ * The deserialization schema we will be using to convert Kinesis
records to Flink objects.
+ * Note that since this might not be thread-safe, multiple threads in
the fetcher using this must
+ * clone a copy using {@link
KinesisDataFetcher#getClonedDeserializationSchema()}.
+ */
+ private final KinesisDeserializationSchema<T> deserializationSchema;
+
+ //
------------------------------------------------------------------------
+ // Subtask-specific settings
+ //
------------------------------------------------------------------------
+
+ /** Runtime context of the subtask that this fetcher was created in */
+ private final RuntimeContext runtimeContext;
+
+ //
------------------------------------------------------------------------
+ // Executor services to run created threads
+ //
------------------------------------------------------------------------
+
+ /** Executor service to run the {@link ShardDiscoverer} and {@link
ShardSubscriber} */
+ private final ExecutorService shardDiscovererAndSubscriberExecutor;
+
+ /** Executor service to run {@link ShardConsumer}s to consumer Kinesis
shards */
+ private final ExecutorService shardConsumersExecutor;
- /** Information of the shards that this fetcher handles, along with the
sequence numbers that they should start from */
- private HashMap<KinesisStreamShard, String>
assignedShardsWithStartingSequenceNum;
+ //
------------------------------------------------------------------------
+ // Managed state, accessed and updated across multiple threads
+ //
------------------------------------------------------------------------
- /** Reference to the thread that executed run() */
- private volatile Thread mainThread;
+ /**
+ * Blocking queue for newly discovered shards, with their states, that
this fetcher should consume.
+ * The {@link ShardDiscoverer} will add shards with initial position as
state to this queue as shards are discovered,
+ * while the {@link ShardSubscriber} polls this queue to start
subscribing to the new discovered shards.
+ */
+ private final BlockingQueue<KinesisStreamShardState> pendingShards;
+
+ /**
+ * The shards, along with their last processed sequence numbers, that
this fetcher is subscribed to. The shard
+ * subscriber will add to this list as it polls pending shards. Shard
consumer threads update the last processed
+ * sequence number of subscribed shards as they fetch and process
records.
+ *
+ * <p>Note that since multiple threads will be performing operations on
this list, all operations must be wrapped in
+ * synchronized blocks on the {@link KinesisDataFetcher#checkpointLock}
lock. For this purpose, all threads must use
+ * the following thread-safe methods this class provides to operate on
this list:
+ * <ul>
+ * <li>{@link
KinesisDataFetcher#addAndStartConsumingNewSubscribedShard(KinesisStreamShardState)}</li>
+ * <li>{@link KinesisDataFetcher#updateState(int, String)}</li>
+ * <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(Object,
int, String)}</li>
+ * </ul>
+ */
+ private final List<KinesisStreamShardState> subscribedShardsState;
+
+ private final SourceFunction.SourceContext<T> sourceContext;
+
+ /** Checkpoint lock, also used to synchronize operations on
subscribedShardsState */
+ private final Object checkpointLock;
- /** Reference to the first error thrown by any of the spawned shard
connection threads */
+ /** This flag is set to true if the fetcher is provided a non-null and
non-empty restored state */
+ private final boolean isRestoredFromCheckpoint;
+
+ /** Reference to the first error thrown by any of the created threads */
private final AtomicReference<Throwable> error;
private volatile boolean running = true;
/**
- * Creates a new Kinesis Data Fetcher for the specified set of shards
+ * Creates a Kinesis Data Fetcher.
*
- * @param assignedShards the shards that this fetcher will pull data
from
- * @param configProps the configuration properties of this Flink
Kinesis Consumer
- * @param taskName the task name of this consumer task
+ * @param streams the streams to subscribe to
+ * @param sourceContext context of the source function
+ * @param runtimeContext this subtask's runtime context
+ * @param configProps the consumer configuration properties
+ * @param restoredState state of subcribed shards that the fetcher
should restore to
+ * @param deserializationSchema deserialization schema
*/
- public KinesisDataFetcher(List<KinesisStreamShard> assignedShards,
Properties configProps, String taskName) {
+ public KinesisDataFetcher(List<String> streams,
+
SourceFunction.SourceContext<T> sourceContext,
+ RuntimeContext
runtimeContext,
+ Properties configProps,
+ Map<KinesisStreamShard,
String> restoredState,
+
KinesisDeserializationSchema<T> deserializationSchema) {
+ this(streams,
+ sourceContext,
+ sourceContext.getCheckpointLock(),
+ runtimeContext,configProps,
+ restoredState,
+ deserializationSchema,
+ new AtomicReference<Throwable>(),
+ new LinkedBlockingQueue<KinesisStreamShardState>(),
+ new LinkedList<KinesisStreamShardState>());
+ }
+
+ /** This constructor is exposed for testing purposes */
+ protected KinesisDataFetcher(List<String> streams,
+
SourceFunction.SourceContext<T> sourceContext,
+ Object
checkpointLock,
+ RuntimeContext
runtimeContext,
+ Properties
configProps,
+
Map<KinesisStreamShard, String> restoredState,
+
KinesisDeserializationSchema<T> deserializationSchema,
+
AtomicReference<Throwable> error,
+
LinkedBlockingQueue<KinesisStreamShardState> pendingShardsQueue,
+
LinkedList<KinesisStreamShardState> subscribedShardsState) {
+ this.streams = checkNotNull(streams);
this.configProps = checkNotNull(configProps);
- this.assignedShardsWithStartingSequenceNum = new HashMap<>();
- for (KinesisStreamShard shard : assignedShards) {
- assignedShardsWithStartingSequenceNum.put(shard,
SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString());
+ this.sourceContext = checkNotNull(sourceContext);
+ this.checkpointLock = checkNotNull(checkpointLock);
+ this.runtimeContext = checkNotNull(runtimeContext);
+ this.deserializationSchema =
checkNotNull(deserializationSchema);
+
+ this.error = error;
+ this.pendingShards = pendingShardsQueue;
+ this.subscribedShardsState = subscribedShardsState;
+
+ this.shardDiscovererAndSubscriberExecutor =
+
createShardDiscovererAndSubscriberThreadPool(runtimeContext.getTaskNameWithSubtasks());
+ this.shardConsumersExecutor =
+
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
+
+ this.isRestoredFromCheckpoint = (restoredState != null &&
restoredState.entrySet().size() != 0);
+
+ // if there is state to restore from last checkpoint, we seed
them as initially discovered shards
+ if (isRestoredFromCheckpoint) {
+ seedPendingShardsWithRestoredState(restoredState,
this.pendingShards);
}
- this.taskName = taskName;
- this.error = new AtomicReference<>();
}
/**
- * Advance a shard's starting sequence number to a specified value
+ * Starts the fetcher. After starting the fetcher, it can only
+ * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
*
- * @param streamShard the shard to perform the advance on
- * @param sequenceNum the sequence number to advance to
+ * @throws Exception the first error or exception thrown by the fetcher
or any of the threads created by the fetcher.
*/
- public void advanceSequenceNumberTo(KinesisStreamShard streamShard,
String sequenceNum) {
- if
(!assignedShardsWithStartingSequenceNum.containsKey(streamShard)) {
- throw new IllegalArgumentException("Can't advance
sequence number on a shard we are not going to read.");
+ public void runFetcher() throws Exception {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Subtask {} is starting the shard discoverer
...", runtimeContext.getIndexOfThisSubtask());
+ }
+ shardDiscovererAndSubscriberExecutor.submit(new
ShardDiscoverer<>(this));
+
+ // after this point we will start fetching data from Kinesis
and update internal state,
+ // so we check that we are running for the last time before
continuing
+ if (!running) {
+ return;
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Subtask {} is starting the shard subscriber
...", runtimeContext.getIndexOfThisSubtask());
+ }
+ shardDiscovererAndSubscriberExecutor.submit(new
ShardSubscriber<>(this));
+
+ while (running) {
+ // once either shutdownFetcher() or stopWithError()
+ // is called, we will escape this loop
+ }
--- End diff --
This loop is extremely inefficient. Is a very busy wait loop (one CPU core
will constantly burn cycles on this loop).
> Handle Kinesis-side resharding in Kinesis streaming consumer
> ------------------------------------------------------------
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
> Issue Type: Sub-task
> Components: Kinesis Connector, Streaming Connectors
> Affects Versions: 1.1.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis
> users can choose to "merge" and "split" shards at any time for adjustable
> stream throughput capacity. This article explains this quite clearly:
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic
> version of the Kinesis consumer
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task
> mapping is done in a simple round-robin-like distribution which can be
> locally determined at each Flink consumer task (Flink Kafka consumer does
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer
> tasks coordinate which shards they are currently handling, and allow the
> tasks to ask the coordinator for a shards reassignment when the task finds
> out it has found a closed shard at runtime (shards will be closed by Kinesis
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink
> consumer tasks. Tasks can use this state store to locally determine what
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the
> coordination, but as described in
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use
> KCL for the implementation of the consumer if we want to leverage Flink's
> checkpointing mechanics. For our own implementation, Zookeeper can be used
> for this state store, but that means it would require the user to set up ZK
> to work.
> Since this feature introduces extensive work, it is opened as a separate
> sub-task from the basic implementation
> https://issues.apache.org/jira/browse/FLINK-3229.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)