[
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996621#comment-15996621
]
ASF GitHub Bot commented on FLINK-4022:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3746#discussion_r114757153
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
---
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all partition discoverers.
+ *
+ * <p>This partition discoverer base class implements the logic around
bookkeeping
+ * discovered partitions, and using the information to determine whether
or not there
+ * are new partitions that the consumer subtask should subscribe to.
+ *
+ * <p>Subclass implementations should simply implement the logic of using
the version-specific
+ * Kafka clients to fetch topic and partition metadata.
+ *
+ * <p>Since Kafka clients are generally not thread-safe, partition
discoverers should
+ * not be concurrently accessed. The only exception for this would be the
{@link #wakeup()}
+ * call, which allows the discoverer to be interrupted during a {@link
#discoverPartitions()} call.
+ */
+public abstract class AbstractPartitionDiscoverer {
+
+ /** Describes whether we are discovering partitions for fixed topics or
a topic pattern. */
+ private final KafkaTopicsDescriptor topicsDescriptor;
+
+ /** Index of the consumer subtask that this partition discoverer
belongs to. */
+ private final int indexOfThisSubtask;
+
+ /** The total number of consumer subtasks. */
+ private final int numParallelSubtasks;
+
+ /** Flag to determine whether or not the discoverer is closed. */
+ private volatile boolean closed = true;
+
+ /**
+ * Flag to determine whether or not the discoverer had been woken up.
+ * When set to {@code true}, {@link #discoverPartitions()} would be
interrupted as early as possible.
+ * Once interrupted, the flag is reset.
+ */
+ private volatile boolean wakeup;
+
+ /**
+ * Map of topics to they're largest discovered partition id seen by
this subtask.
+ * This state may be updated whenever {@link
AbstractPartitionDiscoverer#discoverPartitions()} or
+ * {@link
AbstractPartitionDiscoverer#setAndCheckDiscoveredPartition(KafkaTopicPartition)}
is called.
+ *
+ * This is used to remove old partitions from the fetched partition
lists. It is sufficient
+ * to keep track of only the largest partition id because Kafka
partition numbers are only
+ * allowed to be increased and has incremental ids.
+ */
+ private final Map<String, Integer> topicsToLargestDiscoveredPartitionId;
+
+ public AbstractPartitionDiscoverer(
+ KafkaTopicsDescriptor topicsDescriptor,
+ int indexOfThisSubtask,
+ int numParallelSubtasks) {
+
+ this.topicsDescriptor = checkNotNull(topicsDescriptor);
+ this.indexOfThisSubtask = indexOfThisSubtask;
+ this.numParallelSubtasks = numParallelSubtasks;
+ this.topicsToLargestDiscoveredPartitionId = new HashMap<>();
+ }
+
+ /**
+ * Opens the partition discoverer, initializing all required Kafka
connections.
+ *
+ * <p>NOTE: thread-safety is not guaranteed.
+ */
+ public void open() throws Exception {
+ closed = false;
+ initializeConnections();
+ }
+
+ /**
+ * Closes the partition discoverer, cleaning up all Kafka connections.
+ *
+ * <p>NOTE: thread-safety is not guaranteed.
+ */
+ public void close() throws Exception {
+ closed = true;
+ closeConnections();
+ }
+
+ /**
+ * Interrupt an in-progress discovery attempt by throwing a {@link
WakeupException}.
+ * If no attempt is in progress, the immediate next attempt will throw
a {@link WakeupException}.
+ *
+ * <p>This method can be called concurrently from a different thread.
+ */
+ public void wakeup() {
+ wakeup = true;
+ wakeupConnections();
+ }
+
+ /**
+ * Execute a partition discovery attempt for this subtask.
+ * This method lets the partition discoverer update what partitions it
has discovered so far.
+ *
+ * @return List of discovered new partitions that this subtask should
subscribe to.
+ */
+ public List<KafkaTopicPartition> discoverPartitions() throws
WakeupException, ClosedException {
+ if (!closed && !wakeup) {
+ try {
+ List<KafkaTopicPartition>
newDiscoveredPartitions;
+
+ // (1) get all possible partitions, based on
whether we are subscribed to fixed topics or a topic patern
--- End diff --
Typo "patern"
> Partition discovery / regex topic subscription for the Kafka consumer
> ---------------------------------------------------------------------
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
> Issue Type: New Feature
> Components: Kafka Connector, Streaming Connectors
> Affects Versions: 1.0.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job
> submission, the main big change required for this feature will be dynamic
> partition assignment to subtasks while the Kafka consumer is running. This
> will mainly be accomplished using Kafka 0.9.x API
> `KafkaConsumer#subscribe(java.util.regex.Pattern,
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be
> added to the same consumer group when instantiated, and rely on Kafka to
> dynamically reassign partitions to them whenever a rebalance happens. The
> registered `ConsumerRebalanceListener` is a callback that is called right
> before and after rebalancing happens. We'll use this callback to let each
> subtask commit its last offsets of partitions its currently responsible of to
> an external store (or Kafka) before a rebalance; after rebalance and the
> substasks gets the new partitions it'll be reading from, they'll read from
> the external store to get the last offsets for their new partitions
> (partitions which don't have offset entries in the store are new partitions
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot
> the offsets of partitions they are currently holding. Restoring will be a
> bit different in that subtasks might not be assigned matching partitions to
> the snapshot the subtask is restored with (since we're letting Kafka
> dynamically assign partitions). There will need to be a coordination process
> where, if a restore state exists, all subtasks first commit the offsets they
> receive (as a result of the restore state) to the external store, and then
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is
> available, then the restore will be simple again, as each subtask has full
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign
> static partitions, use subscribe() registered with the callback
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics),
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed
> list of topics. We can simply decide which subscribe() overload to use
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance.
> Instead, un-assigned subtasks should be running a fetcher instance too and
> take part as a process pool for the consumer group of the subscribed topics.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)