Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3746#discussion_r114793528
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
---
@@ -424,65 +485,136 @@ public void run(SourceContext<T> sourceContext)
throws Exception {
throw new Exception("The partitions were not set for
the consumer");
}
- // we need only do work, if we actually have partitions assigned
- if (!subscribedPartitionsToStartOffsets.isEmpty()) {
-
- // create the fetcher that will communicate with the
Kafka brokers
- final AbstractFetcher<T, ?> fetcher = createFetcher(
- sourceContext,
- subscribedPartitionsToStartOffsets,
- periodicWatermarkAssigner,
- punctuatedWatermarkAssigner,
- (StreamingRuntimeContext)
getRuntimeContext(),
- offsetCommitMode);
-
- // publish the reference, for snapshot-, commit-, and
cancel calls
- // IMPORTANT: We can only do that now, because only now
will calls to
- // the fetchers 'snapshotCurrentState()'
method return at least
- // the restored offsets
- this.kafkaFetcher = fetcher;
- if (!running) {
- return;
- }
-
- // (3) run the fetcher' main work method
- fetcher.runFetchLoop();
+ this.runThread = Thread.currentThread();
+
+ // mark the subtask as temporarily idle if there are no initial
seed partitions;
+ // once this subtask discovers some partitions and starts
collecting records, the subtask's
+ // status will automatically be triggered back to be active.
+ if (subscribedPartitionsToStartOffsets.isEmpty()) {
+ sourceContext.markAsTemporarilyIdle();
}
- else {
- // this source never completes, so emit a
Long.MAX_VALUE watermark
- // to not block watermark forwarding
- sourceContext.emitWatermark(new
Watermark(Long.MAX_VALUE));
- // wait until this is canceled
- final Object waitLock = new Object();
+ // create the fetcher that will communicate with the Kafka
brokers
+ final AbstractFetcher<T, ?> fetcher = createFetcher(
+ sourceContext,
+ subscribedPartitionsToStartOffsets,
+ periodicWatermarkAssigner,
+ punctuatedWatermarkAssigner,
+ (StreamingRuntimeContext) getRuntimeContext(),
+ offsetCommitMode);
+
+ // publish the reference, for snapshot-, commit-, and cancel
calls
+ // IMPORTANT: We can only do that now, because only now will
calls to
+ // the fetchers 'snapshotCurrentState()' method
return at least
+ // the restored offsets
+ this.kafkaFetcher = fetcher;
+
+ if (!running) {
+ return;
+ }
+
+ // depending on whether we were restored with the current state
version (1.3),
+ // remaining logic branches off into 2 paths:
+ // 1) New state - main fetcher loop executed as separate
thread, with this
+ // thread running the partition discovery loop
+ // 2) Old state - partition discovery is disabled, simply
going into the main fetcher loop
+
+ if (!restoredFromOldState) {
+ final AtomicReference<Exception> fetcherErrorRef = new
AtomicReference<>();
+ Thread fetcherThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // run the fetcher' main work
method
+ kafkaFetcher.runFetchLoop();
--- End diff --
Hmm, there actually isn't any good reason that this is required, as I can
think of.
one point regarding non-main thread emitting stuff: the Kafka 0.8 fetcher
actually had always been emitting elements from different threads. So I didn't
really assume which thread (main or separate) runs the fetcher loop and which
one runs the discovery loop.
but I think it's also ok to swap this here
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---