[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350815#comment-15350815
 ] 

ASF GitHub Bot commented on FLINK-3231:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2131#discussion_r68560835
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
 ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kinesis.internals;
    +
    +import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
    +import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
    +import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
    +import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
    +import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
    +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
    +import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * This runnable is in charge of discovering new shards that a fetcher 
should subscribe to.
    + * It is submitted to {@link 
KinesisDataFetcher#shardDiscovererAndSubscriberExecutor} and continuously runs 
until the
    + * fetcher is closed. Whenever it discovers a new shard that should be 
subscribed to, the shard is added to the
    + * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e. 
where in the new shard we should start
    + * consuming from.
    + */
    +public class ShardDiscoverer<T> implements Runnable {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(ShardDiscoverer.class);
    +
    +   /** This fetcher reference is used to add discovered shards to the 
pending shards queue */
    +   private final KinesisDataFetcher fetcherRef;
    +
    +   /** Kinesis proxy to retrieve shard lists from Kinesis */
    +   private final KinesisProxyInterface kinesis;
    +
    +   /**
    +    * The last seen shard of each stream. Since new Kinesis shards are 
always created in ascending ids (regardless of
    +    * whether the new shard was a result of a shard split or merge), this 
state can be used when calling
    +    * {@link KinesisProxyInterface#getShardList(Map)} to ignore shards we 
have already discovered before.
    +    */
    +   private final Map<String,String> streamToLastSeenShard;
    +
    +   private final int totalNumberOfConsumerSubtasks;
    +   private final int indexOfThisConsumerSubtask;
    +
    +   /**
    +    * Create a new shard discoverer.
    +    *
    +    * @param fetcherRef reference to the owning fetcher
    +    */
    +   public ShardDiscoverer(KinesisDataFetcher<T> fetcherRef) {
    +           this(fetcherRef, 
KinesisProxy.create(fetcherRef.getConsumerConfiguration()), new HashMap<String, 
String>());
    +   }
    +
    +   /** This constructor is exposed for testing purposes */
    +   protected ShardDiscoverer(KinesisDataFetcher<T> fetcherRef,
    +                                                   KinesisProxyInterface 
kinesis,
    +                                                   Map<String,String> 
streamToLastSeenShard) {
    +           this.fetcherRef = checkNotNull(fetcherRef);
    +           this.kinesis = checkNotNull(kinesis);
    +           this.streamToLastSeenShard = 
checkNotNull(streamToLastSeenShard);
    +
    +           this.totalNumberOfConsumerSubtasks = 
fetcherRef.getSubtaskRuntimeContext().getNumberOfParallelSubtasks();
    +           this.indexOfThisConsumerSubtask = 
fetcherRef.getSubtaskRuntimeContext().getIndexOfThisSubtask();
    +
    +           // we initially map the last seen shard of each subscribed 
stream to null;
    +           // the correct values will be set later on in the constructor
    +           for (String stream : fetcherRef.getSubscribedStreams()) {
    +                   this.streamToLastSeenShard.put(stream, null);
    +           }
    +
    +           // if we are restoring from a checkpoint, the restored state 
should already be in the pending shards queue;
    +           // we iterate over the pending shards queue, and accordingly 
set the stream-to-last-seen-shard map
    +           if (fetcherRef.isRestoredFromCheckpoint()) {
    +                   if (fetcherRef.getCurrentCountOfPendingShards() == 0) {
    +                           throw new RuntimeException("Told to restore 
from checkpoint, but no shards found in discovered shards queue");
    +                   }
    +
    +                   for (KinesisStreamShardState shardState : 
fetcherRef.cloneCurrentPendingShards()) {
    +                           String stream = 
shardState.getKinesisStreamShard().getStreamName();
    +                           String shardId = 
shardState.getKinesisStreamShard().getShard().getShardId();
    +                           if 
(!this.streamToLastSeenShard.containsKey(stream)) {
    +                                   throw new RuntimeException(
    +                                           "pendingShards queue contains a 
shard belonging to a stream that we are not subscribing to");
    +                           } else {
    +                                   String lastSeenShardIdOfStream = 
this.streamToLastSeenShard.get(stream);
    +                                   // the existing shards in the queue may 
not be in ascending id order,
    +                                   // so we must exhaustively find the 
largest shard id of each stream
    +                                   if (lastSeenShardIdOfStream == null) {
    +                                           // if not previously set, 
simply put as the last seen shard id
    +                                           
this.streamToLastSeenShard.put(stream, shardId);
    +                                   } else if 
(KinesisStreamShard.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) {
    +                                           // override if we have found a 
shard with a greater shard id for the stream
    +                                           
this.streamToLastSeenShard.put(stream, shardId);
    +                                   }
    +                           }
    +                   }
    +           }
    +
    +           // we always query for any new shards that may have been 
created while the Kinesis consumer was not running -
    +           // when we are starting fresh (not restoring from a 
checkpoint), this simply means all existing shards of streams
    +           // we are subscribing to are new shards; when we are restoring 
from checkpoint, any new shards due to Kinesis
    +           // resharding from the time of the checkpoint will be 
considered new shards.
    +
    +           SentinelSequenceNumber sentinelSequenceNumber;
    +           if (!fetcherRef.isRestoredFromCheckpoint()) {
    +                   // if starting fresh, each new shard should start from 
the user-configured position
    +                   sentinelSequenceNumber =
    +                           
KinesisConfigUtil.getInitialPositionAsSentinelSequenceNumber(fetcherRef.getConsumerConfiguration());
    +           } else {
    +                   // if restoring from checkpoint, the new shards due to 
Kinesis resharding should be read from earliest record
    +                   sentinelSequenceNumber = 
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM;
    +           }
    +
    +           try {
    +                   // query for new shards that we haven't seen yet up to 
this point
    +                   
discoverNewShardsAndSetInitialStateTo(sentinelSequenceNumber.toString());
    +           } catch (InterruptedException iex) {
    +                   fetcherRef.stopWithError(iex);
    +                   return;
    +           }
    +
    +           boolean hasShards = false;
    +           StringBuilder streamsWithNoShardsFound = new StringBuilder();
    +           for (Map.Entry<String, String> streamToLastSeenShardEntry : 
streamToLastSeenShard.entrySet()) {
    +                   if (streamToLastSeenShardEntry.getValue() != null) {
    +                           hasShards = true;
    +                   } else {
    +                           
streamsWithNoShardsFound.append(streamToLastSeenShardEntry.getKey()).append(", 
");
    +                   }
    +           }
    +
    +           if (streamsWithNoShardsFound.length() != 0 && 
LOG.isWarnEnabled()) {
    +                   LOG.warn("Subtask {} has failed to find any shards for 
the following subscribed streams: {}",
    +                           indexOfThisConsumerSubtask, 
streamsWithNoShardsFound.toString());
    +           }
    +
    +           if (!hasShards) {
    +                   fetcherRef.stopWithError(new RuntimeException(
    +                           "No shards can be found for all subscribed 
streams: " + fetcherRef.getSubscribedStreams()));
    +           }
    +   }
    +
    +   @Override
    +   public void run() {
    +           try {
    +                   while (isRunning()) {
    +                           
discoverNewShardsAndSetInitialStateTo(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.toString());
    +                   }
    +           } catch (Throwable throwable) {
    +                   fetcherRef.stopWithError(throwable);
    +           }
    +   }
    +
    +   private boolean isRunning() {
    +           return !Thread.interrupted();
    +   }
    +
    +   /**
    +    * A utility function that does the following:
    +    *
    +    * 1. Find new shards for each stream that we haven't seen before
    +    * 2. For each new shard, determine whether this consumer subtask 
should subscribe to them;
    +    *        if yes, add the new shard to the discovered shards queue with 
a specified initial state (starting sequence num)
    +    * 3. Update the stream-to-last-seen-shard map so that we won't get 
shards that we have already seen before
    +    *    the next time this function is called
    +    *
    +    * @param initialState the initial state to assign to each new shard 
that this subtask should subscribe to
    +    */
    +   private void discoverNewShardsAndSetInitialStateTo(String initialState) 
throws InterruptedException {
    +           GetShardListResult shardListResult = kinesis.getShardList(new 
HashMap<>(streamToLastSeenShard));
    --- End diff --
    
    Why are you creating a new HashMap here?


> Handle Kinesis-side resharding in Kinesis streaming consumer
> ------------------------------------------------------------
>
>                 Key: FLINK-3231
>                 URL: https://issues.apache.org/jira/browse/FLINK-3231
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Kinesis Connector, Streaming Connectors
>    Affects Versions: 1.1.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to