[
https://issues.apache.org/jira/browse/BEAM-6859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Work on BEAM-6859 started by Michael Luckey.
--------------------------------------------
> Spark-runner: DoFn tearDown function will not be invoked if there is no data
> in the batch
> -----------------------------------------------------------------------------------------
>
> Key: BEAM-6859
> URL: https://issues.apache.org/jira/browse/BEAM-6859
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Affects Versions: 2.8.0, 2.9.0, 2.10.0, 2.11.0
> Reporter: Ron Cai
> Assignee: Michael Luckey
> Priority: Critical
>
> In the implementation of
> [MultiDoFnFunction|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java],
>
> {code:java}
> @Override
> public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>>
> call(Iterator<WindowedValue<InputT>> iter)
> throws Exception {
> if (!wasSetupCalled) {
> DoFnInvokers.invokerFor(doFn).invokeSetup();
> wasSetupCalled = true;
> }
> ...
> return new SparkProcessContext<>(
> doFn,
> doFnRunnerWithMetrics,
> outputManager,
> stateful ? new TimerDataIterator(timerInternals) :
> Collections.emptyIterator())
> .processPartition(iter)
> .iterator();
> }{code}
> It will call setup function of a DoFn every batch in spark streaming. And
> the tearDown function of DoFn will invoked by
> [SparkProcessContext|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java]
> instance. But in the implementation of
> SparkProcessContext.processParition(), if the partition is empty, it will
> return an empty ArrayList instance directly. If there is no data in the
> batch, it means the tearDown function of DoFn will not be invoked for it is
> invoked in the ProcCtxtIterator instance which created only when there are
> data (parition.hasNext() == true).
> {code:java}
> Iterable<OutputT> processPartition(Iterator<WindowedValue<FnInputT>>
> partition) throws Exception {
>
> // skip if partition is empty.
> if (!partition.hasNext()) {
> return new ArrayList<>();
> }
>
> // process the partition; finishBundle() is called from within the
> output iterator.
> return this.getOutputIterable(partition, doFnRunner);
> }
> {code}
> If you want to reproduce the issue, just build a pipeline to read from
> KafkaIO.read and write by KafkaIO.write() to kafka and run as a spark
> streaming application, don't send any data to the kafka topic. Thread count
> of kafka producer will keep increasing and OOO at the end.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)