Abacn commented on code in PR #32440:
URL: https://github.com/apache/beam/pull/32440#discussion_r3041601190
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java:
##########
@@ -103,21 +103,32 @@ public Boundedness getBoundedness() {
@Override
public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer,
List<FlinkSourceSplit<T>>>>
- createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>>
enumContext) {
+ createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>>
enumContext) throws Exception {
+ return createEnumerator(enumContext, false);
+ }
- return new FlinkSourceSplitEnumerator<>(
- enumContext, beamSource, serializablePipelineOptions.get(), numSplits);
+ public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer,
List<FlinkSourceSplit<T>>>>
+ createEnumerator(
+ SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext, boolean
splitInitialized)
+ throws Exception {
+
+ if (boundedness == Boundedness.BOUNDED) {
+ return new LazyFlinkSourceSplitEnumerator<>(
Review Comment:
A racing condition is found in LazyFlinkSourceSplitEnumerator and currently
affecting #38072. Basically, when
LazyFlinkSourceSplitEnumerator.handleSplitRequest called while callAsync inside
LazyFlinkSourceSplitEnumerator.initializeSplits not yet finished,
signalNoMoreSplits is issued and the pipeline finish prematurely, causing
source never gets eveluated.
I tried that adding a `Thread.sleep(1000)` in
LazyFlinkSourceSplitEnumerator.handleSplitRequest, the test passed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]