Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3746#discussion_r114760511
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
---
@@ -424,65 +485,136 @@ public void run(SourceContext<T> sourceContext)
throws Exception {
throw new Exception("The partitions were not set for
the consumer");
}
- // we need only do work, if we actually have partitions assigned
- if (!subscribedPartitionsToStartOffsets.isEmpty()) {
-
- // create the fetcher that will communicate with the
Kafka brokers
- final AbstractFetcher<T, ?> fetcher = createFetcher(
- sourceContext,
- subscribedPartitionsToStartOffsets,
- periodicWatermarkAssigner,
- punctuatedWatermarkAssigner,
- (StreamingRuntimeContext)
getRuntimeContext(),
- offsetCommitMode);
-
- // publish the reference, for snapshot-, commit-, and
cancel calls
- // IMPORTANT: We can only do that now, because only now
will calls to
- // the fetchers 'snapshotCurrentState()'
method return at least
- // the restored offsets
- this.kafkaFetcher = fetcher;
- if (!running) {
- return;
- }
-
- // (3) run the fetcher' main work method
- fetcher.runFetchLoop();
+ this.runThread = Thread.currentThread();
+
+ // mark the subtask as temporarily idle if there are no initial
seed partitions;
+ // once this subtask discovers some partitions and starts
collecting records, the subtask's
+ // status will automatically be triggered back to be active.
+ if (subscribedPartitionsToStartOffsets.isEmpty()) {
+ sourceContext.markAsTemporarilyIdle();
}
- else {
- // this source never completes, so emit a
Long.MAX_VALUE watermark
- // to not block watermark forwarding
- sourceContext.emitWatermark(new
Watermark(Long.MAX_VALUE));
- // wait until this is canceled
- final Object waitLock = new Object();
+ // create the fetcher that will communicate with the Kafka
brokers
+ final AbstractFetcher<T, ?> fetcher = createFetcher(
+ sourceContext,
+ subscribedPartitionsToStartOffsets,
+ periodicWatermarkAssigner,
+ punctuatedWatermarkAssigner,
+ (StreamingRuntimeContext) getRuntimeContext(),
+ offsetCommitMode);
+
+ // publish the reference, for snapshot-, commit-, and cancel
calls
+ // IMPORTANT: We can only do that now, because only now will
calls to
+ // the fetchers 'snapshotCurrentState()' method
return at least
+ // the restored offsets
+ this.kafkaFetcher = fetcher;
+
+ if (!running) {
+ return;
+ }
+
+ // depending on whether we were restored with the current state
version (1.3),
+ // remaining logic branches off into 2 paths:
+ // 1) New state - main fetcher loop executed as separate
thread, with this
+ // thread running the partition discovery loop
+ // 2) Old state - partition discovery is disabled, simply
going into the main fetcher loop
+
+ if (!restoredFromOldState) {
+ final AtomicReference<Exception> fetcherErrorRef = new
AtomicReference<>();
+ Thread fetcherThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // run the fetcher' main work
method
+ kafkaFetcher.runFetchLoop();
--- End diff --
Before this, the Fetcher was run in the Task thread. I'm not sure that's
strictly necessary anymore but in the past there were always problems if a
Thread that is not the main Thread of a Task was emitting stuff.
Is there a good reason for not starting the partition discoverer in a
separate thread?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---