Ron Cai created BEAM-6859:
-----------------------------
Summary: DoFn tearDown function will not be invoked if there is no
data in the batch
Key: BEAM-6859
URL: https://issues.apache.org/jira/browse/BEAM-6859
Project: Beam
Issue Type: Bug
Components: runner-spark
Affects Versions: 2.11.0
Reporter: Ron Cai
In the implementation of
[MultiDoFnFunction|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java],
{code:java}
@Override
public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>>
call(Iterator<WindowedValue<InputT>> iter)
throws Exception {
if (!wasSetupCalled) {
DoFnInvokers.invokerFor(doFn).invokeSetup();
wasSetupCalled = true;
}
...
return new SparkProcessContext<>(
doFn,
doFnRunnerWithMetrics,
outputManager,
stateful ? new TimerDataIterator(timerInternals) :
Collections.emptyIterator())
.processPartition(iter)
.iterator();
}{code}
It will call setup function of a DoFn every batch in spark streaming. And the
tearDown function of DoFn will invoked by
[SparkProcessContext|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java]
instance. But in the implementation of SparkProcessContext.processParition(),
if the partition is empty, it will return an empty ArrayList instance directly.
If there is no data in the batch, it means the tearDown function of DoFn will
not be invoked for it is invoked in the ProcCtxtIterator instance which created
only when there are data (parition.hasNext() == true).
{code:java}
Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>> partition)
throws Exception {
// skip if partition is empty.
if (!partition.hasNext()) {
return new ArrayList<>();
}
// process the partition; finishBundle() is called from within the
output iterator.
return this.getOutputIterable(partition, doFnRunner);
}
{code}
In this implementation, if build a pipeline to read from KafkaIO.read and write
by KafkaIO.write() to kafka and run as a spark streaming application, don't
send any data to the kafka topic. Thread count of kafka producer will keep
increasing and OOO at the end.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)