[
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006677#comment-16006677
]
ASF GitHub Bot commented on FLINK-4022:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3746#discussion_r115996970
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
---
@@ -106,31 +139,69 @@ protected AbstractFetcher(
}
}
- // create our partition state according to the
timestamp/watermark mode
- this.subscribedPartitionStates =
initializeSubscribedPartitionStates(
- assignedPartitionsWithInitialOffsets,
+ this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
+
+ // initialize subscribed partition states with seed partitions
+ this.subscribedPartitionStates = createPartitionStateHolders(
+ seedPartitionsWithInitialOffsets,
timestampWatermarkMode,
- watermarksPeriodic, watermarksPunctuated,
+ watermarksPeriodic,
+ watermarksPunctuated,
userCodeClassLoader);
- // check that all partition states have a defined offset
+ // check that all seed partition states have a defined offset
for (KafkaTopicPartitionState partitionState :
subscribedPartitionStates) {
if (!partitionState.isOffsetDefined()) {
- throw new IllegalArgumentException("The fetcher
was assigned partitions with undefined initial offsets.");
+ throw new IllegalArgumentException("The fetcher
was assigned seed partitions with undefined initial offsets.");
}
}
-
+
+ // all seed partitions are not assigned yet, so should be added
to the unassigned partitions queue
+ for (KafkaTopicPartitionState<KPH> partition :
subscribedPartitionStates) {
+ unassignedPartitionsQueue.add(partition);
+ }
+
// if we have periodic watermarks, kick off the interval
scheduler
if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
- KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]
parts =
-
(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[])
subscribedPartitionStates;
-
- PeriodicWatermarkEmitter periodicEmitter =
- new PeriodicWatermarkEmitter(parts,
sourceContext, processingTimeProvider, autoWatermarkInterval);
+ @SuppressWarnings("unchecked")
+ PeriodicWatermarkEmitter periodicEmitter = new
PeriodicWatermarkEmitter(
--- End diff --
Could add the generic parameter `KPH` here.
> Partition discovery / regex topic subscription for the Kafka consumer
> ---------------------------------------------------------------------
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
> Issue Type: New Feature
> Components: Kafka Connector, Streaming Connectors
> Affects Versions: 1.0.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Critical
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job
> submission, the main big change required for this feature will be dynamic
> partition assignment to subtasks while the Kafka consumer is running. This
> will mainly be accomplished using Kafka 0.9.x API
> `KafkaConsumer#subscribe(java.util.regex.Pattern,
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be
> added to the same consumer group when instantiated, and rely on Kafka to
> dynamically reassign partitions to them whenever a rebalance happens. The
> registered `ConsumerRebalanceListener` is a callback that is called right
> before and after rebalancing happens. We'll use this callback to let each
> subtask commit its last offsets of partitions its currently responsible of to
> an external store (or Kafka) before a rebalance; after rebalance and the
> substasks gets the new partitions it'll be reading from, they'll read from
> the external store to get the last offsets for their new partitions
> (partitions which don't have offset entries in the store are new partitions
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot
> the offsets of partitions they are currently holding. Restoring will be a
> bit different in that subtasks might not be assigned matching partitions to
> the snapshot the subtask is restored with (since we're letting Kafka
> dynamically assign partitions). There will need to be a coordination process
> where, if a restore state exists, all subtasks first commit the offsets they
> receive (as a result of the restore state) to the external store, and then
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is
> available, then the restore will be simple again, as each subtask has full
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign
> static partitions, use subscribe() registered with the callback
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics),
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed
> list of topics. We can simply decide which subscribe() overload to use
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance.
> Instead, un-assigned subtasks should be running a fetcher instance too and
> take part as a process pool for the consumer group of the subscribed topics.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)