[
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15352298#comment-15352298
]
ASF GitHub Bot commented on FLINK-3231:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/2131#discussion_r68694102
--- Diff:
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This runnable is in charge of discovering new shards that a fetcher
should subscribe to.
+ * It is submitted to {@link
KinesisDataFetcher#shardDiscovererAndSubscriberExecutor} and continuously runs
until the
+ * fetcher is closed. Whenever it discovers a new shard that should be
subscribed to, the shard is added to the
+ * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e.
where in the new shard we should start
+ * consuming from.
+ */
+public class ShardDiscoverer<T> implements Runnable {
--- End diff --
Some feedback for this approach:
At first I had thought of the solution you mentioned. The problem was that
the new shards will not necessarily belong to the subtask that hit a closed
shard (according to the new way we are assigning shards to subtasks in
`ShardDiscoverer#isShouldSubscribeTo()`), so we still need other subtasks to
continuously poll Kinesis in order for the right subtasks to pick up the new
shards. Simply put, I don't think we can soley rely on the event of
encountering a closed shard to let the new shards get fully picked up by the
correct subtasks.
We could drop the assigning of `ShardDiscoverer#isShouldSubscribeTo()`, and
simply let subtasks be responsible for new child shards of a closed shard it
was previously consuming. However, with this approach we'll be subject to
severe load imbalance, because in Kinesis, users can freely choose which shards
to split and merge. To counter this we will a need rebalance mechanism, as well
as a "merged snapshot" on restore on failure.
> Handle Kinesis-side resharding in Kinesis streaming consumer
> ------------------------------------------------------------
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
> Issue Type: Sub-task
> Components: Kinesis Connector, Streaming Connectors
> Affects Versions: 1.1.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis
> users can choose to "merge" and "split" shards at any time for adjustable
> stream throughput capacity. This article explains this quite clearly:
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic
> version of the Kinesis consumer
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task
> mapping is done in a simple round-robin-like distribution which can be
> locally determined at each Flink consumer task (Flink Kafka consumer does
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer
> tasks coordinate which shards they are currently handling, and allow the
> tasks to ask the coordinator for a shards reassignment when the task finds
> out it has found a closed shard at runtime (shards will be closed by Kinesis
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink
> consumer tasks. Tasks can use this state store to locally determine what
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the
> coordination, but as described in
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use
> KCL for the implementation of the consumer if we want to leverage Flink's
> checkpointing mechanics. For our own implementation, Zookeeper can be used
> for this state store, but that means it would require the user to set up ZK
> to work.
> Since this feature introduces extensive work, it is opened as a separate
> sub-task from the basic implementation
> https://issues.apache.org/jira/browse/FLINK-3229.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)