[
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996839#comment-15996839
]
ASF GitHub Bot commented on FLINK-4022:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3746#discussion_r114793528
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
---
@@ -424,65 +485,136 @@ public void run(SourceContext<T> sourceContext)
throws Exception {
throw new Exception("The partitions were not set for
the consumer");
}
- // we need only do work, if we actually have partitions assigned
- if (!subscribedPartitionsToStartOffsets.isEmpty()) {
-
- // create the fetcher that will communicate with the
Kafka brokers
- final AbstractFetcher<T, ?> fetcher = createFetcher(
- sourceContext,
- subscribedPartitionsToStartOffsets,
- periodicWatermarkAssigner,
- punctuatedWatermarkAssigner,
- (StreamingRuntimeContext)
getRuntimeContext(),
- offsetCommitMode);
-
- // publish the reference, for snapshot-, commit-, and
cancel calls
- // IMPORTANT: We can only do that now, because only now
will calls to
- // the fetchers 'snapshotCurrentState()'
method return at least
- // the restored offsets
- this.kafkaFetcher = fetcher;
- if (!running) {
- return;
- }
-
- // (3) run the fetcher' main work method
- fetcher.runFetchLoop();
+ this.runThread = Thread.currentThread();
+
+ // mark the subtask as temporarily idle if there are no initial
seed partitions;
+ // once this subtask discovers some partitions and starts
collecting records, the subtask's
+ // status will automatically be triggered back to be active.
+ if (subscribedPartitionsToStartOffsets.isEmpty()) {
+ sourceContext.markAsTemporarilyIdle();
}
- else {
- // this source never completes, so emit a
Long.MAX_VALUE watermark
- // to not block watermark forwarding
- sourceContext.emitWatermark(new
Watermark(Long.MAX_VALUE));
- // wait until this is canceled
- final Object waitLock = new Object();
+ // create the fetcher that will communicate with the Kafka
brokers
+ final AbstractFetcher<T, ?> fetcher = createFetcher(
+ sourceContext,
+ subscribedPartitionsToStartOffsets,
+ periodicWatermarkAssigner,
+ punctuatedWatermarkAssigner,
+ (StreamingRuntimeContext) getRuntimeContext(),
+ offsetCommitMode);
+
+ // publish the reference, for snapshot-, commit-, and cancel
calls
+ // IMPORTANT: We can only do that now, because only now will
calls to
+ // the fetchers 'snapshotCurrentState()' method
return at least
+ // the restored offsets
+ this.kafkaFetcher = fetcher;
+
+ if (!running) {
+ return;
+ }
+
+ // depending on whether we were restored with the current state
version (1.3),
+ // remaining logic branches off into 2 paths:
+ // 1) New state - main fetcher loop executed as separate
thread, with this
+ // thread running the partition discovery loop
+ // 2) Old state - partition discovery is disabled, simply
going into the main fetcher loop
+
+ if (!restoredFromOldState) {
+ final AtomicReference<Exception> fetcherErrorRef = new
AtomicReference<>();
+ Thread fetcherThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // run the fetcher' main work
method
+ kafkaFetcher.runFetchLoop();
--- End diff --
Hmm, there actually isn't any good reason that this is required, as I can
think of.
one point regarding non-main thread emitting stuff: the Kafka 0.8 fetcher
actually had always been emitting elements from different threads. So I didn't
really assume which thread (main or separate) runs the fetcher loop and which
one runs the discovery loop.
but I think it's also ok to swap this here
> Partition discovery / regex topic subscription for the Kafka consumer
> ---------------------------------------------------------------------
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
> Issue Type: New Feature
> Components: Kafka Connector, Streaming Connectors
> Affects Versions: 1.0.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job
> submission, the main big change required for this feature will be dynamic
> partition assignment to subtasks while the Kafka consumer is running. This
> will mainly be accomplished using Kafka 0.9.x API
> `KafkaConsumer#subscribe(java.util.regex.Pattern,
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be
> added to the same consumer group when instantiated, and rely on Kafka to
> dynamically reassign partitions to them whenever a rebalance happens. The
> registered `ConsumerRebalanceListener` is a callback that is called right
> before and after rebalancing happens. We'll use this callback to let each
> subtask commit its last offsets of partitions its currently responsible of to
> an external store (or Kafka) before a rebalance; after rebalance and the
> substasks gets the new partitions it'll be reading from, they'll read from
> the external store to get the last offsets for their new partitions
> (partitions which don't have offset entries in the store are new partitions
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot
> the offsets of partitions they are currently holding. Restoring will be a
> bit different in that subtasks might not be assigned matching partitions to
> the snapshot the subtask is restored with (since we're letting Kafka
> dynamically assign partitions). There will need to be a coordination process
> where, if a restore state exists, all subtasks first commit the offsets they
> receive (as a result of the restore state) to the external store, and then
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is
> available, then the restore will be simple again, as each subtask has full
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign
> static partitions, use subscribe() registered with the callback
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics),
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed
> list of topics. We can simply decide which subscribe() overload to use
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance.
> Instead, un-assigned subtasks should be running a fetcher instance too and
> take part as a process pool for the consumer group of the subscribed topics.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)